Skip to main content

bark/persist/adaptor/
mod.rs

1//! Storage adaptor module providing the [StorageAdaptor] trait and blanket
2//! implementation of [BarkPersister] for any type implementing [StorageAdaptor].
3//!
4//! This module provides an optimized single-table storage abstraction that can be
5//! efficiently implemented on various backends (SQLite, Postgres, MongoDB, Firebase,
6//! in-memory, etc.).
7//!
8//! The design uses structured keys:
9//! - **Primary key (`pk`)**: Unique identifier for each record
10//! - **Partition key**: Groups related records for efficient querying
11//! - **Sort key**: Enables ordered iteration and range queries
12//!
13//! # Example
14//!
15//! ```rust
16//! # use bark::persist::adaptor::memory::MemoryStorageAdaptor;
17//! # use bark::persist::adaptor::{Query, Record, StorageAdaptor, SortKey};
18//!
19//! # async fn example() -> anyhow::Result<()> {
20//! // Create an in-memory storage adaptor
21//! let mut storage = MemoryStorageAdaptor::new();
22//!
23//! // Store a record sorted by a numeric field (ascending)
24//! let record = Record {
25//!     partition: 0,
26//!     pk: "item:1".into(),
27//!     sort_key: Some(SortKey::u32_asc(42)),
28//!     data: b"hello world".to_vec(),
29//! };
30//! storage.put(record).await?;
31//!
32//! // Query with efficient index scan
33//! let query = Query::new_full_range(0).limit(10);
34//! let records = storage.query_sorted(query).await?;
35//! # Ok(())
36//! # }
37//! ```
38
39mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::SortKey;
45
46
47use std::ops::RangeBounds;
48
49use anyhow::Context;
50use bitcoin::{Amount, Transaction, Txid};
51use bitcoin::secp256k1::PublicKey;
52use bitcoin::hashes::Hash;
53#[cfg(feature = "onchain-bdk")]
54use bdk_core::Merge;
55#[cfg(feature = "onchain-bdk")]
56use bdk_wallet::ChangeSet;
57use chrono::{DateTime, Local};
58use lightning_invoice::Bolt11Invoice;
59use serde::{de::DeserializeOwned, Serialize};
60
61use ark::lightning::{Invoice, PaymentHash, Preimage};
62use ark::{Vtxo, VtxoId};
63use ark::vtxo::Full;
64use bitcoin_ext::BlockDelta;
65
66use crate::exit::ExitTxOrigin;
67use crate::movement::{
68	Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
69};
70use crate::persist::BarkPersister;
71use crate::persist::models::{
72	LightningReceive, LightningSend, PendingBoard, PendingOffboard,
73	RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
74	StoredRoundState, Unlocked,
75};
76use crate::round::RoundState;
77use crate::vtxo::{VtxoState, VtxoStateKind};
78use crate::{WalletProperties, WalletVtxo};
79
80
81pub mod partition {
82	pub const PROPERTIES: u8 = 0;
83	#[allow(unused)]
84	pub const BDK_CHANGESET: u8 = 1;
85	pub const VTXO: u8 = 2;
86	pub const PUBLIC_KEY: u8 = 3;
87	pub const PENDING_BOARD: u8 = 4;
88	pub const ROUND_STATE: u8 = 5;
89	pub const MOVEMENT: u8 = 6;
90	pub const LIGHTNING_SEND: u8 = 7;
91	pub const LIGHTNING_RECEIVE: u8 = 8;
92	pub const EXIT_VTXO: u8 = 9;
93	pub const EXIT_CHILD_TX: u8 = 10;
94	pub const MAILBOX_CHECKPOINT: u8 = 11;
95	pub const PENDING_OFFBOARD: u8 = 12;
96	/// An index table for payment method to movement
97	pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
98
99	pub const LAST_IDS: u8 = u8::MAX;
100}
101
102/// A storage record with structured keys.
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub struct Record {
105	/// Partition key for grouping related records (e.g., "vtxo", "movement").
106	///
107	/// Queries always filter by partition.
108	pub partition: u8,
109
110	/// Unique primary key
111	pub pk: Vec<u8>,
112
113	/// Optional sort key for ordered iteration within a partition.
114	///
115	/// Use [`SortKey::builder()`] to construct composite keys with
116	/// mixed sort directions.
117	///
118	/// This field may be set or changed after record insertion.
119	/// Implementation should support updating the sort key of a
120	/// record post-insert if needed.
121	pub sort_key: Option<SortKey>,
122
123	/// The record data encoded as JSON.
124	pub data: Vec<u8>,
125}
126
127impl Record {
128	/// Converts the record data to a typed value.
129	fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
130		serde_json::from_slice(&self.data).map_err(Into::into)
131	}
132
133	/// Creates a new record from a typed value.
134	fn from_data<T: Serialize>(
135		partition: u8,
136		pk: &[u8],
137		sort_key: Option<SortKey>,
138		data: &T,
139	) -> anyhow::Result<Record> {
140		Ok(Record {
141			partition,
142			pk: pk.to_vec(),
143			sort_key,
144			data: serde_json::to_vec(data)?,
145		})
146	}
147}
148
149/// A range of sort keys.
150pub trait QueryRange: RangeBounds<SortKey> + Send {}
151
152impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
153
154/// Query specification for retrieving sorted records from a partition.
155#[derive(Debug, Clone)]
156pub struct Query<R: QueryRange> {
157	/// Partition to query (required).
158	pub partition: u8,
159
160	/// Range of sort keys to query.
161	pub range: R,
162
163	/// Maximum number of records to return.
164	pub limit: Option<usize>,
165}
166
167impl<R: QueryRange> Query<R> {
168	/// Creates a new query for the given partition.
169	pub fn new(partition: u8, range: R) -> Self {
170		Self {
171			partition,
172			range,
173			limit: None,
174		}
175	}
176
177	/// Sets the maximum number of records to return.
178	///
179	/// If the range is greater than the limit, the query will
180	/// return the first `limit` records.
181	pub fn limit(mut self, limit: usize) -> Self {
182		self.limit = Some(limit);
183		self
184	}
185}
186
187impl Query<std::ops::RangeFull> {
188	pub fn new_full_range(partition: u8) -> Self {
189		Self::new(partition, ..)
190	}
191}
192
193fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
194	let body = pm.value_string();
195
196	let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
197	buf.extend(pm.type_str().as_bytes().iter().copied());
198	// nb we need to separate with a non-utf8 byte
199	// we use 0xfe so that we can easily query on prefix from <type>0xfe .. <type>0xff
200	buf.push(0xfe);
201	buf.extend(body.into_bytes());
202	buf
203}
204
205/// Storage adaptor trait for persistence backends.
206///
207/// This trait provides a minimal interface (5 methods) that can be efficiently
208/// implemented on various storage backends while enabling query optimization.
209///
210/// # Implementor's Guide
211///
212/// ## Simple backends (memory, file-based)
213///
214/// Store records in a map/list keyed by `(partition, pk)`.
215///
216/// - `query_sorted`: query sorted records by partition, excluding those without
217///	  a sort key defined.
218/// - `get_all`: return every record in the partition regardless of whether it
219///   has a sort key. No ordering is guaranteed.
220///
221/// ## Database backends (Postgres, MongoDB, Firebase, IndexedDB, etc.)
222///
223/// Create a single table with indexes:
224///
225/// ```sql
226/// CREATE TABLE storage (
227///     pk TEXT PRIMARY KEY,
228///     partition TEXT NOT NULL,
229///     sort_key BLOB,
230///     data BLOB NOT NULL
231/// );
232/// CREATE INDEX idx_partition_sort ON storage(partition, sort_key);
233/// ```
234///
235/// Translate [`Query`] to SQL:
236///
237/// ```sql
238/// -- query_sorted: only returns records with a sort key
239/// SELECT * FROM storage
240/// WHERE partition = :partition AND sort_key IS NOT NULL
241/// ORDER BY sort_key DESC
242/// ```
243///
244/// Translate [`get_all`](StorageAdaptor::get_all) to SQL:
245///
246/// ```sql
247/// -- get_all: returns all records in the partition, no ordering
248/// SELECT * FROM storage
249/// WHERE partition = :partition
250/// ```
251#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
252#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
253pub trait StorageAdaptor: Send + Sync + 'static {
254	/// Stores a record, inserting or updating by primary key.
255	async fn put(&mut self, record: Record) -> anyhow::Result<()>;
256
257	/// Retrieves a record by primary key.
258	///
259	/// Returns `None` if the record doesn't exist.
260	async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
261
262	/// Deletes a record by primary key.
263	///
264	/// Returns the deleted record if it existed, `None` otherwise.
265	async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
266
267	/// Queries sorted records from a partition.
268	///
269	/// Results are ordered by sort key.
270	/// Records without a sort key must not be returned, to query all
271	/// records use [`StorageAdaptor::get_all`].
272	async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
273		-> anyhow::Result<Vec<Record>>;
274
275	/// Get all records in a partition.
276	///
277	/// No ordering guarantees are made.
278	async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
279
280	/// Increments the last partition id, then stores and returns the new id.
281	async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
282		let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
283			.map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
284		let next_partition_id = last_partition_id + 1;
285
286		let record = Record::from_data(
287			partition::LAST_IDS,
288			&[partition],
289			None,
290			&next_partition_id,
291		)?;
292
293		self.put(record).await?;
294		Ok(next_partition_id)
295	}
296}
297
298async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
299	match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
300		Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
301		None => Ok(None),
302	}
303}
304
305async fn get_check_vtxo_state<S: StorageAdaptor>(
306	adaptor: &S,
307	vtxo_id: VtxoId,
308	allowed_states: &[VtxoStateKind],
309) -> anyhow::Result<SerdeVtxo> {
310	let vtxo = get_vtxo(adaptor, vtxo_id).await?
311		.context("vtxo not found")?;
312
313	let current_state = vtxo.current_state().context("vtxo has no state")?;
314	if !allowed_states.contains(&current_state.kind()) {
315		bail!("current state {:?} not in allowed states {:?}",
316			current_state.kind(), allowed_states
317		);
318	}
319
320	Ok(vtxo)
321}
322
323async fn update_vtxo_state_checked<S: StorageAdaptor>(
324	adaptor: &mut S,
325	vtxo_id: VtxoId,
326	new_state: VtxoState,
327	allowed_old_states: &[VtxoStateKind],
328) -> anyhow::Result<WalletVtxo> {
329	let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
330
331	let sk = sort::vtxo_sort_key(
332		new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
333	);
334
335	serde_vtxo.states.push(new_state.clone());
336	let updated_record = Record::from_data(
337		partition::VTXO,
338		&vtxo_id.to_bytes(),
339		Some(sk),
340		&serde_vtxo,
341	)?;
342
343	adaptor.put(updated_record).await?;
344
345	Ok(WalletVtxo {
346		vtxo: serde_vtxo.vtxo,
347		state: new_state,
348	})
349}
350
351pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
352	inner: tokio::sync::RwLock<S>,
353}
354
355impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
356	pub fn new(inner: S) -> Self {
357		Self {
358			inner: tokio::sync::RwLock::new(inner),
359		}
360	}
361}
362
363/// Blanket implementation of `BarkPersister` for any type implementing `StorageAdaptor`.
364#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
365#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
366impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
367	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
368		let record = Record::from_data(
369			partition::PROPERTIES,
370			// NB: a single set of properties is stored, so no need for primary key
371			&[],
372			None,
373			properties,
374		)?;
375		self.inner.write().await.put(record).await
376	}
377
378	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
379		match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
380			Some(record) => Ok(Some(record.to_data()?)),
381			None => Ok(None),
382		}
383	}
384
385	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
386		let mut properties = match self.read_properties().await? {
387			Some(properties) => properties,
388			None => bail!("wallet not initialized"),
389		};
390
391		properties.server_pubkey = Some(server_pubkey);
392
393		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
394		self.inner.write().await.put(record).await
395	}
396
397	#[cfg(feature = "onchain-bdk")]
398	async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
399		match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
400			Some(record) => record.to_data(),
401			None => Ok(ChangeSet::default()),
402		}
403	}
404
405	#[cfg(feature = "onchain-bdk")]
406	async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
407		let mut current = self.initialize_bdk_wallet().await?;
408		current.merge(changeset.clone());
409
410		let record = Record::from_data(
411			partition::BDK_CHANGESET,
412			// NB: a single changeset is stored, so no need for primary key
413			&[],
414			None,
415			&current,
416		)?;
417		self.inner.write().await.put(record).await
418	}
419
420	async fn create_new_movement(
421		&self,
422		status: MovementStatus,
423		subsystem: &MovementSubsystem,
424		time: DateTime<Local>,
425	) -> anyhow::Result<MovementId> {
426		let mut lock = self.inner.write().await;
427
428		let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
429		let movement = Movement::new(id, status, subsystem, time);
430
431		let record = Record::from_data(
432			partition::MOVEMENT,
433			&id.to_bytes(),
434			Some(sort::movement_sort_key(&time)),
435			&movement,
436		)?;
437		lock.put(record).await?;
438
439		Ok(id)
440	}
441
442	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
443		let mut guard = self.inner.write().await;
444
445		let record = Record::from_data(
446			partition::MOVEMENT,
447			&movement.id.to_bytes(),
448			Some(sort::movement_sort_key(&movement.time.created_at)),
449			movement,
450		)?;
451		guard.put(record).await?;
452
453		// then add records for each payment method
454		let sent = movement.sent_to.iter().map(|d| &d.destination);
455		let rcvd = movement.received_on.iter().map(|d| &d.destination);
456		for pm in sent.chain(rcvd) {
457			let pm_bytes = serialize_payment_method(pm);
458			let primary_key = {
459				// We just need a unique key, but we will never query using this
460				let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
461				buf.extend(pm_bytes.iter().copied());
462				buf.extend(movement.id.to_bytes());
463				buf
464			};
465			let record = Record::from_data(
466				partition::MOVEMENT_PAYMENT_METHOD,
467				&primary_key,
468				Some(SortKey::from_bytes(pm_bytes)),
469				&movement.id.0,
470			)?;
471			guard.put(record).await?;
472		}
473
474		Ok(())
475	}
476
477	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
478		self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
479			.await?
480			.context("movement not found")?
481			.to_data()
482	}
483
484	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
485		let records = self.inner.read().await
486			.query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
487		records.into_iter().map(|r| r.to_data()).collect()
488	}
489
490	async fn get_movements_by_payment_method(
491		&self,
492		payment_method: &PaymentMethod,
493	) -> anyhow::Result<Vec<Movement>> {
494		let pm_bytes = serialize_payment_method(payment_method);
495		let sort_key = SortKey::from_bytes(pm_bytes);
496
497		let guard = self.inner.read().await;
498		let idx_recs = guard.query_sorted(Query::new(
499			partition::MOVEMENT_PAYMENT_METHOD,
500			sort_key.clone()..=sort_key,
501		)).await?;
502
503		let mut ret = Vec::with_capacity(idx_recs.len());
504		for idx_rec in idx_recs {
505			let id = MovementId::new(idx_rec.to_data::<u32>()
506				.context("corrupt db: movement payment method index value")?);
507
508			ret.push(
509				guard.get(partition::MOVEMENT, &id.to_bytes()).await?
510					.context("corrupt db: movement payment method entry for unknown movement")?
511					.to_data()
512					.context("corrupt db: invalid movement record")?
513			);
514		}
515		Ok(ret)
516	}
517
518	async fn store_pending_board(
519		&self,
520		vtxo: &Vtxo<Full>,
521		funding_tx: &Transaction,
522		movement_id: MovementId,
523	) -> anyhow::Result<()> {
524		let pending_board = PendingBoard {
525			vtxos: vec![vtxo.id()],
526			amount: vtxo.amount(),
527			funding_tx: funding_tx.clone(),
528			movement_id,
529		};
530
531		let record = Record::from_data(
532			partition::PENDING_BOARD,
533			&vtxo.id().to_bytes(),
534			None,
535			&pending_board,
536		)?;
537
538		self.inner.write().await.put(record).await
539	}
540
541	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
542		self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
543		Ok(())
544	}
545
546	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
547		let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
548		records
549			.into_iter()
550			.map(|r| {
551				let board = r.to_data::<PendingBoard>()?;
552				Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
553			})
554			.collect()
555	}
556
557	async fn get_pending_board_by_vtxo_id(
558		&self,
559		vtxo_id: VtxoId,
560	) -> anyhow::Result<Option<PendingBoard>> {
561		match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
562			Some(record) => Ok(Some(record.to_data()?)),
563			None => Ok(None),
564		}
565	}
566
567	async fn store_round_state_lock_vtxos(
568		&self,
569		round_state: &RoundState,
570	) -> anyhow::Result<RoundStateId> {
571		let mut lock = self.inner.write().await;
572
573		let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
574
575		let allowed_states = &[VtxoStateKind::Spendable];
576
577		// First check that the inputs are spendable
578		for vtxo in round_state.participation().inputs.iter() {
579			get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
580		}
581
582		for vtxo in round_state.participation().inputs.iter() {
583			update_vtxo_state_checked(
584				&mut *lock,
585				vtxo.id(),
586				VtxoState::Locked { movement_id: round_state.movement_id },
587				allowed_states,
588			).await?;
589		}
590
591		let serde_state = SerdeRoundState::from(round_state);
592		let record = Record::from_data(
593			partition::ROUND_STATE,
594			&id.to_bytes(),
595			Some(sort::SortKey::u32_asc(id.0)),
596			&serde_state,
597		)?;
598		lock.put(record).await?;
599
600		Ok(id)
601	}
602
603	async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
604		let serde_state = SerdeRoundState::from(round_state.state());
605		let record = Record::from_data(
606			partition::ROUND_STATE,
607			&round_state.id().to_bytes(),
608			Some(sort::SortKey::u32_asc(round_state.id().0)),
609			&serde_state,
610		)?;
611		self.inner.write().await.put(record).await
612	}
613
614	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
615		self.inner.write().await
616			.delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
617		Ok(())
618	}
619
620	async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
621		let record = self.inner.read().await
622			.get(partition::ROUND_STATE, &_id.to_bytes()).await?;
623		match record {
624			Some(r) => {
625				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
626				let id = RoundStateId(u32::from_be_bytes(pk_slice));
627				let state = r.to_data::<SerdeRoundState>()?.into();
628				Ok(Some(StoredRoundState::new(id, state)))
629			},
630			None => Ok(None),
631		}
632	}
633
634	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
635		let records = self.inner.read().await
636			.get_all(partition::ROUND_STATE).await?;
637		records.into_iter()
638			.map(|r| {
639				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
640				Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
641			})
642			.collect()
643	}
644
645	async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
646		let mut lock = self.inner.write().await;
647
648		for (vtxo, state) in vtxos {
649			let serde_vtxo = SerdeVtxo {
650				vtxo: (*vtxo).clone(),
651				states: vec![(*state).clone()],
652			};
653
654			let sk = sort::vtxo_sort_key(
655				state.kind(), vtxo.expiry_height(), vtxo.amount(),
656			);
657			let record = Record::from_data(
658				partition::VTXO,
659				&vtxo.id().to_bytes(),
660				Some(sk),
661				&serde_vtxo,
662			)?;
663			lock.put(record).await?;
664		}
665		Ok(())
666	}
667
668	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
669		let lock = self.inner.read().await;
670		match get_vtxo(&*lock, id).await? {
671			Some(vtxo) => Ok(Some(WalletVtxo {
672				state: vtxo.current_state().context("vtxo has no state")?.clone(),
673				vtxo: vtxo.vtxo,
674			})),
675			None => Ok(None),
676		}
677	}
678
679	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
680		let records = self.inner.read().await
681			.query_sorted(Query::new_full_range(partition::VTXO)).await?;
682
683		records
684			.into_iter()
685			.map(|r| {
686				let serde_vtxo = r.to_data::<SerdeVtxo>()?;
687				let state = serde_vtxo
688					.current_state()
689					.cloned()
690					.context("vtxo has no state")?;
691				Ok(WalletVtxo {
692					vtxo: serde_vtxo.vtxo,
693					state,
694				})
695			})
696			.collect()
697	}
698
699	async fn get_vtxos_by_state(
700		&self,
701		states: &[VtxoStateKind],
702	) -> anyhow::Result<Vec<WalletVtxo>> {
703		let lock = self.inner.read().await;
704
705		let range = |state: VtxoStateKind| {
706			let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
707			let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
708			(start, end)
709		};
710
711		let mut records = Vec::new();
712		for state in states {
713			let (start, end) = range(*state);
714			let query = Query::new(partition::VTXO, start..=end);
715
716			for record in lock.query_sorted(query).await? {
717				let serde_vtxo = record.to_data::<SerdeVtxo>()?;
718				let current_state = serde_vtxo.current_state()
719					.context("vtxo has no current state")?.clone();
720				debug_assert_eq!(current_state.kind(), *state);
721				records.push(WalletVtxo {
722					vtxo: serde_vtxo.vtxo,
723					state: current_state,
724				});
725			}
726		}
727
728		Ok(records)
729	}
730
731	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
732		match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
733			Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
734			None => Ok(None),
735		}
736	}
737
738	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
739		match self.get_wallet_vtxo(id).await? {
740			Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
741			None => Ok(false),
742		}
743	}
744
745	async fn update_vtxo_state_checked(
746		&self,
747		vtxo_id: VtxoId,
748		new_state: VtxoState,
749		allowed_old_states: &[VtxoStateKind],
750	) -> anyhow::Result<WalletVtxo> {
751		let mut lock = self.inner.write().await;
752		update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
753	}
754
755	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
756		let vtxo_key = SerdeVtxoKey { index, public_key };
757		let record = Record::from_data(
758			partition::PUBLIC_KEY,
759			&public_key.serialize()[..],
760			Some(sort::SortKey::u64_desc(index as u64)),
761			&vtxo_key,
762		)?;
763		self.inner.write().await.put(record).await
764	}
765
766	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
767		// pks are sorted descending, so the first one is the highest index
768		let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
769		let records = self.inner.read().await.query_sorted(query).await?;
770
771		match records.into_iter().next() {
772			Some(record) => {
773				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
774				Ok(Some(vtxo_key.index))
775			}
776			None => Ok(None),
777		}
778	}
779
780	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
781		match self.inner.read().await
782			.get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
783		{
784			Some(record) => {
785				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
786				Ok(Some(vtxo_key.index))
787			}
788			None => Ok(None),
789		}
790	}
791
792	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
793		match self.inner.read().await
794			.get(partition::MAILBOX_CHECKPOINT, &[]).await?
795		{
796			Some(record) => Ok(record.to_data::<u64>()?),
797			None => Ok(0),
798		}
799	}
800
801	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
802		let mut lock = self.inner.write().await;
803		let record = Record::from_data(
804			partition::MAILBOX_CHECKPOINT,
805			&[],
806			None,
807			&checkpoint,
808		)?;
809		lock.put(record).await?;
810		Ok(())
811	}
812
813	async fn store_new_pending_lightning_send(
814		&self,
815		invoice: &Invoice,
816		amount: Amount,
817		fee: Amount,
818		vtxo_ids: &[VtxoId],
819		movement_id: MovementId,
820	) -> anyhow::Result<LightningSend> {
821		let mut lock = self.inner.write().await;
822		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
823		for vtxo_id in vtxo_ids {
824			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
825				.context("vtxo not found")?;
826			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
827		}
828
829		let lightning_send = LightningSend {
830			invoice: invoice.clone(),
831			amount,
832			fee,
833			htlc_vtxos,
834			preimage: None,
835			movement_id,
836			finished_at: None,
837		};
838
839		let record = Record::from_data(
840			partition::LIGHTNING_SEND,
841			&invoice.payment_hash().to_byte_array(),
842			None,
843			&lightning_send,
844		)?;
845
846		lock.put(record).await?;
847
848		Ok(lightning_send)
849	}
850
851	async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
852		let records = self.inner.read().await
853			.get_all(partition::LIGHTNING_SEND).await?;
854		records
855			.into_iter()
856			.filter_map(|r| {
857				let send = r.to_data::<LightningSend>().ok()?;
858				if send.finished_at.is_none() {
859					Some(Ok(send))
860				} else {
861					None
862				}
863			})
864			.collect()
865	}
866
867	async fn finish_lightning_send(
868		&self,
869		payment_hash: PaymentHash,
870		preimage: Option<Preimage>,
871	) -> anyhow::Result<()> {
872		let mut lock = self.inner.write().await;
873
874		let pk = payment_hash.to_byte_array();
875		let record = lock
876			.get(partition::LIGHTNING_SEND, &pk).await?.context("lightning send not found")?;
877		let mut lightning_send: LightningSend = record.to_data()?;
878
879		lightning_send.preimage = preimage;
880		lightning_send.finished_at = Some(Local::now());
881
882		let updated_record = Record::from_data(
883			partition::LIGHTNING_SEND,
884			&pk,
885			None,
886			&lightning_send,
887		)?;
888		lock.put(updated_record).await?;
889
890		Ok(())
891	}
892
893	async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
894		self.inner.write().await.delete(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?;
895		Ok(())
896	}
897
898	async fn get_lightning_send(
899		&self,
900		payment_hash: PaymentHash,
901	) -> anyhow::Result<Option<LightningSend>> {
902		match self.inner.read().await
903			.get(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?
904		{
905			Some(record) => Ok(Some(record.to_data()?)),
906			None => Ok(None),
907		}
908	}
909
910	async fn store_lightning_receive(
911		&self,
912		payment_hash: PaymentHash,
913		preimage: Preimage,
914		invoice: &Bolt11Invoice,
915		htlc_recv_cltv_delta: BlockDelta,
916	) -> anyhow::Result<()> {
917		let lightning_receive = LightningReceive {
918			payment_hash,
919			payment_preimage: preimage,
920			invoice: invoice.clone(),
921			htlc_recv_cltv_delta,
922			htlc_vtxos: vec![],
923			movement_id: None,
924			finished_at: None,
925			preimage_revealed_at: None,
926		};
927
928		let record = Record::from_data(
929			partition::LIGHTNING_RECEIVE,
930			&payment_hash.to_byte_array(),
931			None,
932			&lightning_receive,
933		)?;
934		self.inner.write().await.put(record).await
935	}
936
937	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
938		let records = self.inner.read().await
939			.get_all(partition::LIGHTNING_RECEIVE).await?;
940		records
941			.into_iter()
942			.filter_map(|r| {
943				let receive = r.to_data::<LightningReceive>().ok()?;
944				if receive.finished_at.is_none() {
945					Some(Ok(receive))
946				} else {
947					None
948				}
949			})
950			.collect()
951	}
952
953	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
954		let mut lock = self.inner.write().await;
955
956		let pk = payment_hash.to_byte_array();
957		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
958			.context("lightning receive not found")?;
959		let mut lightning_receive: LightningReceive = record.to_data()?;
960
961		lightning_receive.preimage_revealed_at = Some(Local::now());
962
963		let updated_record = Record::from_data(
964			partition::LIGHTNING_RECEIVE,
965			&pk,
966			None,
967			&lightning_receive,
968		)?;
969		lock.put(updated_record).await
970	}
971
972	async fn update_lightning_receive(
973		&self,
974		payment_hash: PaymentHash,
975		vtxo_ids: &[VtxoId],
976		movement_id: MovementId,
977	) -> anyhow::Result<()> {
978		let mut lock = self.inner.write().await;
979		let pk = payment_hash.to_byte_array();
980		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
981			.context("lightning receive not found")?;
982		let mut lightning_receive: LightningReceive = record.to_data()?;
983
984		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
985		for vtxo_id in vtxo_ids {
986			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
987				.context("vtxo not found")?;
988			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
989		}
990
991		lightning_receive.htlc_vtxos = htlc_vtxos;
992		lightning_receive.movement_id = Some(movement_id);
993
994		let updated_record = Record::from_data(
995			partition::LIGHTNING_RECEIVE,
996			&pk,
997			None,
998			&lightning_receive,
999		)?;
1000		lock.put(updated_record).await
1001	}
1002
1003	async fn fetch_lightning_receive_by_payment_hash(
1004		&self,
1005		payment_hash: PaymentHash,
1006	) -> anyhow::Result<Option<LightningReceive>> {
1007		match self.inner.read().await
1008			.get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1009		{
1010			Some(record) => Ok(Some(record.to_data()?)),
1011			None => Ok(None),
1012		}
1013	}
1014
1015	async fn finish_pending_lightning_receive(
1016		&self,
1017		payment_hash: PaymentHash,
1018	) -> anyhow::Result<()> {
1019		let mut lock = self.inner.write().await;
1020		let pk = payment_hash.to_byte_array();
1021		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1022			.context("lightning receive not found")?;
1023		let mut lightning_receive: LightningReceive = record.to_data()?;
1024
1025		lightning_receive.finished_at = Some(Local::now());
1026
1027		let updated_record = Record::from_data(
1028			partition::LIGHTNING_RECEIVE,
1029			&pk,
1030			None,
1031			&lightning_receive,
1032		)?;
1033		lock.put(updated_record).await
1034	}
1035
1036	async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1037		let record = Record::from_data(
1038			partition::PENDING_OFFBOARD,
1039			&pending.movement_id.to_bytes(),
1040			None,
1041			pending,
1042		)?;
1043		self.inner.write().await.put(record).await
1044	}
1045
1046	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1047		let records = self.inner.read().await
1048			.get_all(partition::PENDING_OFFBOARD).await?;
1049		records.into_iter().map(|r| r.to_data()).collect()
1050	}
1051
1052	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1053		self.inner.write().await
1054			.delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1055		Ok(())
1056	}
1057
1058	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1059		let record = Record::from_data(
1060			partition::EXIT_VTXO,
1061			&exit.vtxo_id.to_bytes(),
1062			None,
1063			exit,
1064		)?;
1065		self.inner.write().await.put(record).await
1066	}
1067
1068	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1069		self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1070		Ok(())
1071	}
1072
1073	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1074		let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1075		records.into_iter().map(|r| r.to_data()).collect()
1076	}
1077
1078	async fn store_exit_child_tx(
1079		&self,
1080		exit_txid: Txid,
1081		child_tx: &Transaction,
1082		origin: ExitTxOrigin,
1083	) -> anyhow::Result<()> {
1084		let exit_child = SerdeExitChildTx {
1085			child_tx: child_tx.clone(),
1086			origin,
1087		};
1088		let record = Record::from_data(
1089			partition::EXIT_CHILD_TX,
1090			&exit_txid.to_byte_array(),
1091			None,
1092			&exit_child,
1093		)?;
1094		self.inner.write().await.put(record).await
1095	}
1096
1097	async fn get_exit_child_tx(
1098		&self,
1099		exit_txid: Txid,
1100	) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1101		match self.inner.read().await
1102			.get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1103		{
1104			Some(record) => {
1105				let exit_child = record.to_data::<SerdeExitChildTx>()?;
1106				Ok(Some((exit_child.child_tx, exit_child.origin)))
1107			}
1108			None => Ok(None),
1109		}
1110	}
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115	use super::*;
1116
1117	#[test]
1118	fn storage_query_builder() {
1119		let query = Query::new_full_range(0).limit(10);
1120
1121		assert_eq!(query.partition, 0);
1122		assert_eq!(query.limit, Some(10));
1123		assert_eq!(query.range, ..);
1124	}
1125}
1126
1127/// This module provides comprehensive tests for all four methods of the
1128/// `StorageAdaptor` trait. Use these functions to validate custom implementations.
1129///
1130/// # Example
1131///
1132/// ```rust
1133/// use bark::persist::adaptor::memory::test_suite;
1134///
1135/// #[tokio::test]
1136/// async fn test_my_custom_adaptor() {
1137///     let storage = MyCustomStorageAdaptor::new();
1138///     test_suite::run_all(&storage).await;
1139/// }
1140/// ```
1141#[cfg(test)]
1142pub mod test_suite {
1143	use super::*;
1144	use super::partition::LAST_IDS;
1145	use super::sort::SortKey;
1146
1147	async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1148		for partition in partitions {
1149			let records = storage.get_all(*partition).await?;
1150			for record in records {
1151				storage.delete(record.partition, &record.pk).await?;
1152			}
1153		}
1154		Ok(())
1155	}
1156
1157	/// Runs all test suites against the given storage adaptor.
1158	pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1159		// put tests
1160		test_put_insert(storage).await;
1161		test_put_upsert(storage).await;
1162		test_put_with_sort_key(storage).await;
1163		test_put_without_sort_key(storage).await;
1164		test_put_multiple_partitions(storage).await;
1165
1166		// get tests
1167		test_get_existing(storage).await;
1168		test_get_after_update(storage).await;
1169
1170		// delete tests
1171		test_delete_existing(storage).await;
1172		test_delete_nonexistent(storage).await;
1173		test_delete_idempotent(storage).await;
1174
1175		// query tests
1176		test_query_empty_partition(storage).await;
1177		test_query_returns_partition_records(storage).await;
1178		test_query_ordering(storage).await;
1179		test_query_with_limit(storage).await;
1180		test_query_null_sort_key_excluded(storage).await;
1181		test_query_partition_isolation(storage).await;
1182		test_query_range(storage).await;
1183		test_query_exclusive_end_range(storage).await;
1184		test_query_full_range_limit_one(storage).await;
1185
1186		// get_all tests
1187		test_get_all_empty_partition(storage).await;
1188		test_get_all_returns_all_records(storage).await;
1189		test_get_all_includes_records_without_sort_key(storage).await;
1190		test_get_all_partition_isolation(storage).await;
1191		test_get_all_after_delete(storage).await;
1192
1193		// incremental_id tests
1194		test_incremental_id_starts_at_one(storage).await;
1195		test_incremental_id_increments(storage).await;
1196		test_incremental_id_partition_isolation(storage).await;
1197		test_incremental_id_persists_across_operations(storage).await;
1198	}
1199
1200	/// Tests that put inserts a new record.
1201	pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1202		let record = Record {
1203			pk: "put_insert_1".into(),
1204			partition: 0,
1205			sort_key: None,
1206			data: b"test data".to_vec(),
1207		};
1208
1209		storage.put(record).await.expect("put should succeed");
1210
1211		let retrieved = storage
1212			.get(0, b"put_insert_1")
1213			.await
1214			.expect("get should succeed")
1215			.expect("record should exist");
1216
1217		assert_eq!(retrieved.pk, b"put_insert_1");
1218		assert_eq!(retrieved.partition, 0);
1219		assert_eq!(retrieved.data, b"test data");
1220	}
1221
1222	/// Tests that put updates an existing record (upsert behavior).
1223	pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1224		let record1 = Record {
1225			pk: b"put_upsert_1".into(),
1226			partition: 0,
1227			sort_key: None,
1228			data: b"original".to_vec(),
1229		};
1230		storage.put(record1).await.expect("first put should succeed");
1231
1232		let record2 = Record {
1233			pk: "put_upsert_1".into(),
1234			partition: 0,
1235			sort_key: None,
1236			data: b"updated".to_vec(),
1237		};
1238		storage
1239			.put(record2)
1240			.await
1241			.expect("second put should succeed");
1242
1243		let retrieved = storage
1244			.get(0, b"put_upsert_1")
1245			.await
1246			.expect("get should succeed")
1247			.expect("record should exist");
1248
1249		assert_eq!(retrieved.data, b"updated", "data should be updated");
1250	}
1251
1252	/// Tests that put correctly stores the sort key.
1253	pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1254		let sort_key = SortKey::u32_asc(42);
1255		let record = Record {
1256			pk: b"put_sort_key_1".into(),
1257			partition: 0,
1258			sort_key: Some(sort_key.clone()),
1259			data: b"with sort key".to_vec(),
1260		};
1261
1262		storage.put(record).await.expect("put should succeed");
1263
1264		let retrieved = storage
1265			.get(0, b"put_sort_key_1")
1266			.await
1267			.expect("get should succeed")
1268			.expect("record should exist");
1269
1270		assert_eq!(retrieved.sort_key, Some(sort_key));
1271	}
1272
1273	/// Tests that put correctly handles records without sort keys.
1274	pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1275		let record = Record {
1276			pk: b"put_no_sort_key_1".into(),
1277			partition: 0,
1278			sort_key: None,
1279			data: b"no sort key".to_vec(),
1280		};
1281
1282		storage.put(record).await.expect("put should succeed");
1283
1284		let retrieved = storage
1285			.get(0, b"put_no_sort_key_1")
1286			.await
1287			.expect("get should succeed")
1288			.expect("record should exist");
1289
1290		assert!(retrieved.sort_key.is_none());
1291	}
1292
1293	/// Tests that put correctly handles multiple partitions.
1294	pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1295		let record_a = Record {
1296			pk: "put_multi_a".into(),
1297			partition: 0,
1298			sort_key: None,
1299			data: b"in partition a".to_vec(),
1300		};
1301		let record_b = Record {
1302			pk: "put_multi_b".into(),
1303			partition: 1,
1304			sort_key: None,
1305			data: b"in partition b".to_vec(),
1306		};
1307
1308		storage.put(record_a).await.expect("put a should succeed");
1309		storage.put(record_b).await.expect("put b should succeed");
1310
1311		let retrieved_a = storage
1312			.get(0, b"put_multi_a")
1313			.await
1314			.expect("get should succeed")
1315			.expect("record a should exist");
1316		let retrieved_b = storage
1317			.get(1, b"put_multi_b")
1318			.await
1319			.expect("get should succeed")
1320			.expect("record b should exist");
1321
1322		assert_eq!(retrieved_a.partition, 0);
1323		assert_eq!(retrieved_b.partition, 1);
1324	}
1325
1326	/// Tests that get returns an existing record.
1327	pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1328		let record = Record {
1329			pk: b"get_existing_1".into(),
1330			partition: 0,
1331			sort_key: Some(SortKey::u32_asc(100)),
1332			data: b"test".to_vec(),
1333		};
1334		storage.put(record).await.expect("put should succeed");
1335
1336		let retrieved = storage
1337			.get(0, b"get_existing_1")
1338			.await
1339			.expect("get should succeed");
1340
1341		assert!(retrieved.is_some());
1342		let retrieved = retrieved.unwrap();
1343		assert_eq!(retrieved.pk, b"get_existing_1");
1344		assert_eq!(retrieved.partition, 0);
1345		assert_eq!(retrieved.data, b"test");
1346
1347		// superset of the key
1348		assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1349		// subset of the key
1350		assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1351
1352		// non-existent key
1353		assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1354	}
1355
1356	/// Tests that get returns updated data after put.
1357	pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1358		let record1 = Record {
1359			pk: b"get_after_update_1".into(),
1360			partition: 0,
1361			sort_key: None,
1362			data: b"version1".to_vec(),
1363		};
1364		storage.put(record1).await.expect("put should succeed");
1365
1366		let record2 = Record {
1367			pk: b"get_after_update_1".into(),
1368			partition: 0,
1369			sort_key: None,
1370			data: b"version2".to_vec(),
1371		};
1372		storage.put(record2).await.expect("put should succeed");
1373
1374		let retrieved = storage
1375			.get(0, b"get_after_update_1")
1376			.await
1377			.expect("get should succeed")
1378			.expect("record should exist");
1379
1380		assert_eq!(retrieved.data, b"version2");
1381	}
1382
1383	/// Tests that delete removes an existing record and returns it.
1384	pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1385		let record = Record {
1386			pk: b"delete_existing_1".into(),
1387			partition: 0,
1388			sort_key: None,
1389			data: b"to delete".to_vec(),
1390		};
1391		storage.put(record.clone()).await.expect("put should succeed");
1392
1393		let deleted_record = storage
1394			.delete(0, b"delete_existing_1")
1395			.await
1396			.expect("delete should succeed");
1397
1398		assert_eq!(deleted_record, Some(record));
1399
1400		let retrieved = storage
1401			.get(0, b"delete_existing_1")
1402			.await
1403			.expect("get should succeed");
1404		assert!(retrieved.is_none(), "record should no longer exist");
1405	}
1406
1407	/// Tests that delete returns None for non-existent records.
1408	pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1409		let deleted_record = storage
1410			.delete(0, b"delete_nonexistent_does_not_exist")
1411			.await
1412			.expect("delete should succeed");
1413
1414		assert!(
1415			deleted_record.is_none(),
1416			"delete should return None for non-existent record"
1417		);
1418	}
1419
1420	/// Tests that delete is idempotent (second delete returns None).
1421	pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1422		let record = Record {
1423			pk: b"delete_idempotent_1".into(),
1424			partition: 0,
1425			sort_key: None,
1426			data: b"delete twice".to_vec(),
1427		};
1428		storage.put(record.clone()).await.expect("put should succeed");
1429
1430		let first_delete = storage
1431			.delete(0, b"delete_idempotent_1")
1432			.await
1433			.expect("first delete should succeed");
1434		let second_delete = storage
1435			.delete(0, b"delete_idempotent_1")
1436			.await
1437			.expect("second delete should succeed");
1438
1439		assert_eq!(first_delete, Some(record), "first delete should return the record");
1440		assert_eq!(second_delete, None, "second delete should return None");
1441	}
1442
1443	/// Tests that query returns empty results for empty partition.
1444	pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1445		clear_partitions(storage, &[0]).await.unwrap();
1446		let results = storage
1447			.query_sorted(Query::new_full_range(0))
1448			.await
1449			.expect("query should succeed");
1450
1451		assert!(results.is_empty());
1452	}
1453
1454	/// Tests that query returns all records in a partition.
1455	pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1456		clear_partitions(storage, &[0]).await.unwrap();
1457		for i in 0..3 {
1458			let record = Record {
1459				pk: format!("query_partition_{}", i).into(),
1460				partition: 0,
1461				sort_key: Some(SortKey::u32_asc(i)),
1462				data: format!("record_{}", i).as_bytes().to_vec(),
1463			};
1464			storage.put(record).await.expect("put should succeed");
1465		}
1466
1467		let results = storage
1468			.query_sorted(Query::new_full_range(0))
1469			.await
1470			.expect("query should succeed");
1471
1472		assert_eq!(results.len(), 3);
1473	}
1474
1475	/// Tests that query returns records in ascending sort key order by default.
1476	pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1477		clear_partitions(storage, &[0]).await.unwrap();
1478		// Insert in non-sequential order
1479		for i in [5, 2, 8, 1, 9] {
1480			let record = Record {
1481				pk: format!("query_asc_{}", i).into(),
1482				partition: 0,
1483				sort_key: Some(SortKey::u32_asc(i)),
1484				data: format!("record_{}", i).as_bytes().to_vec(),
1485			};
1486			storage.put(record).await.expect("put should succeed");
1487		}
1488
1489		let results = storage
1490			.query_sorted(Query::new_full_range(0))
1491			.await
1492			.expect("query should succeed");
1493
1494		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1495		assert_eq!(
1496			values,
1497			vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1498			"should be in ascending order"
1499		);
1500	}
1501
1502	/// Tests that query with limit returns at most N records.
1503	pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1504		clear_partitions(storage, &[0]).await.unwrap();
1505		for i in 0..10 {
1506			let record = Record {
1507				pk: format!("query_limit_{}", i).into(),
1508				partition: 0,
1509				sort_key: Some(SortKey::u32_asc(i)),
1510				data: format!("record_{}", i).as_bytes().to_vec(),
1511			};
1512			storage.put(record).await.expect("put should succeed");
1513		}
1514
1515		let results = storage
1516			.query_sorted(Query::new_full_range(0).limit(3))
1517			.await
1518			.expect("query should succeed");
1519
1520		assert_eq!(results.len(), 3);
1521		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1522		assert_eq!(
1523			values,
1524			vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1525			"should return first 3 records"
1526		);
1527	}
1528
1529	/// Tests that records without sort keys don't appear in query
1530	pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1531		clear_partitions(storage, &[0]).await.unwrap();
1532		// Records with sort keys
1533		let with_key_1 = Record {
1534			pk: "query_null_with_1".into(),
1535			partition: 0,
1536			sort_key: Some(SortKey::u32_asc(1)),
1537			data: b"with_key_1".to_vec(),
1538		};
1539		let with_key_2 = Record {
1540			pk: "query_null_with_2".into(),
1541			partition: 0,
1542			sort_key: Some(SortKey::u32_asc(2)),
1543			data: b"with_key_2".to_vec(),
1544		};
1545
1546		// Record without sort key
1547		let without_key = Record {
1548			pk: "query_null_without".into(),
1549			partition: 0,
1550			sort_key: None,
1551			data: b"no_key".to_vec(),
1552		};
1553
1554		storage.put(with_key_1).await.expect("put should succeed");
1555		storage.put(without_key).await.expect("put should succeed");
1556		storage.put(with_key_2).await.expect("put should succeed");
1557
1558		// query should only return records with sort keys
1559		let results_query = storage.query_sorted(Query::new_full_range(0)).await
1560			.expect("query should succeed");
1561		assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1562		assert_eq!(results_query[0].data, b"with_key_1");
1563		assert_eq!(results_query[1].data, b"with_key_2");
1564
1565		// get_all should return all records
1566		let results_all = storage.get_all(0).await
1567			.expect("get_all should succeed");
1568		assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1569
1570		// Verify all three records are present (order not guaranteed for get_all)
1571		let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1572		let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1573		let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1574		assert!(has_with_key_1, "get_all should include with_key_1");
1575		assert!(has_with_key_2, "get_all should include with_key_2");
1576		assert!(has_without_key, "get_all should include record without sort key");
1577	}
1578
1579	/// Tests that query only returns records from the specified partition.
1580	pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1581		clear_partitions(storage, &[0, 1]).await.unwrap();
1582		// Records in partition A
1583		for i in 0..3 {
1584			let record = Record {
1585				pk: format!("query_iso_a_{}", i).into(),
1586				partition: 0,
1587				sort_key: Some(SortKey::u32_asc(i)),
1588				data: format!("record_{}", i).as_bytes().to_vec(),
1589			};
1590			storage.put(record).await.expect("put should succeed");
1591		}
1592
1593		// Records in partition B
1594		for i in 0..5 {
1595			let record = Record {
1596				pk: format!("query_iso_b_{}", i).into(),
1597				partition: 1,
1598				sort_key: Some(SortKey::u32_asc(i)),
1599				data: format!("record_{}", i + 100).as_bytes().to_vec(),
1600			};
1601			storage.put(record).await.expect("put should succeed");
1602		}
1603
1604		let results_a = storage
1605			.query_sorted(Query::new_full_range(0))
1606			.await
1607			.expect("query should succeed");
1608
1609		let results_b = storage
1610			.query_sorted(Query::new_full_range(1))
1611			.await
1612			.expect("query should succeed");
1613
1614		assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1615		assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1616
1617		// Verify all results_a are from partition A
1618		assert!(results_a
1619			.iter()
1620			.all(|r| r.partition == 0));
1621
1622		// Verify all results_b are from partition B
1623		assert!(results_b
1624			.iter()
1625			.all(|r| r.partition == 1));
1626	}
1627
1628	/// Tests that query with start and end keys returns only records within the range.
1629	pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1630		clear_partitions(storage, &[0]).await.unwrap();
1631
1632		// Insert records with sort keys 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1633		for i in 1..=10u32 {
1634			let record = Record {
1635				pk: format!("query_range_{}", i).into(),
1636				partition: 0,
1637				sort_key: Some(SortKey::u32_asc(i)),
1638				data: format!("record_{}", i).as_bytes().to_vec(),
1639			};
1640			storage.put(record).await.expect("put should succeed");
1641		}
1642
1643		// Query with start key only (>= 5)
1644		let results_start = storage
1645			.query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1646			.await
1647			.expect("query should succeed");
1648
1649		assert_eq!(results_start.len(), 6, "should return records 5-10");
1650		let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1651		assert_eq!(
1652			values,
1653			vec![
1654				b"record_5".to_vec(),
1655				b"record_6".to_vec(),
1656				b"record_7".to_vec(),
1657				b"record_8".to_vec(),
1658				b"record_9".to_vec(),
1659				b"record_10".to_vec(),
1660			],
1661			"should return records from 5 onwards"
1662		);
1663
1664		// Query with end key only (<= 3)
1665		let results_end = storage
1666			.query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1667			.await
1668			.expect("query should succeed");
1669
1670		assert_eq!(results_end.len(), 3, "should return records 1-3");
1671		let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1672		assert_eq!(
1673			values,
1674			vec![
1675				b"record_1".to_vec(),
1676				b"record_2".to_vec(),
1677				b"record_3".to_vec(),
1678			],
1679			"should return records up to 3"
1680		);
1681
1682		// Query with both start and end keys (3 <= x <= 7)
1683		let results_range = storage
1684			.query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1685			.await
1686			.expect("query should succeed");
1687
1688		assert_eq!(results_range.len(), 5, "should return records 3-7");
1689		let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1690		assert_eq!(
1691			values,
1692			vec![
1693				b"record_3".to_vec(),
1694				b"record_4".to_vec(),
1695				b"record_5".to_vec(),
1696				b"record_6".to_vec(),
1697				b"record_7".to_vec(),
1698			],
1699			"should return records in range 3-7"
1700		);
1701
1702		// Query range with limit
1703		let results_range_limit = storage
1704			.query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1705			.await
1706			.expect("query should succeed");
1707
1708		assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1709		let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1710		assert_eq!(
1711			values,
1712			vec![
1713				b"record_2".to_vec(),
1714				b"record_3".to_vec(),
1715				b"record_4".to_vec(),
1716			],
1717			"should return first 3 records in range"
1718		);
1719
1720		// Query range that matches no records
1721		let results_empty = storage
1722			.query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1723			.await
1724			.expect("query should succeed");
1725
1726		assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1727	}
1728
1729	/// Tests that exclusive end ranges (start..end) exclude the end bound.
1730	pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1731		clear_partitions(storage, &[0]).await.unwrap();
1732
1733		for i in 1..=10u32 {
1734			let record = Record {
1735				pk: format!("query_excl_{}", i).into(),
1736				partition: 0,
1737				sort_key: Some(SortKey::u32_asc(i)),
1738				data: format!("record_{}", i).as_bytes().to_vec(),
1739			};
1740			storage.put(record).await.expect("put should succeed");
1741		}
1742
1743		// Exclusive end: 3 <= x < 7 (should return records 3, 4, 5, 6)
1744		let results = storage
1745			.query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1746			.await
1747			.expect("query should succeed");
1748
1749		assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1750		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1751		assert_eq!(
1752			values,
1753			vec![
1754				b"record_3".to_vec(),
1755				b"record_4".to_vec(),
1756				b"record_5".to_vec(),
1757				b"record_6".to_vec(),
1758			],
1759		);
1760
1761		// Exclusive end only: x < 4 (should return records 1, 2, 3)
1762		let results = storage
1763			.query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1764			.await
1765			.expect("query should succeed");
1766
1767		assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1768		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1769		assert_eq!(
1770			values,
1771			vec![
1772				b"record_1".to_vec(),
1773				b"record_2".to_vec(),
1774				b"record_3".to_vec(),
1775			],
1776		);
1777	}
1778
1779	/// Tests that full-range query with limit 1 returns the first sorted record.
1780	pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1781		clear_partitions(storage, &[0]).await.unwrap();
1782
1783		for i in [5u32, 2, 8, 1, 9] {
1784			let record = Record {
1785				pk: format!("query_limit1_{}", i).into(),
1786				partition: 0,
1787				sort_key: Some(SortKey::u32_asc(i)),
1788				data: format!("record_{}", i).as_bytes().to_vec(),
1789			};
1790			storage.put(record).await.expect("put should succeed");
1791		}
1792
1793		let results = storage
1794			.query_sorted(Query::new_full_range(0).limit(1))
1795			.await
1796			.expect("query should succeed");
1797
1798		assert_eq!(results.len(), 1);
1799		assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1800	}
1801
1802	/// Tests that incremental_id returns 1 for the first call on a partition.
1803	pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1804		// Clear the LAST_IDS entry for partition 0
1805		storage.delete(LAST_IDS, b"0").await.unwrap();
1806		let id = storage.incremental_id(0).await
1807			.expect("incremental_id should succeed");
1808
1809		assert_eq!(id, 1, "first id should be 1");
1810	}
1811
1812	/// Tests that incremental_id increments on subsequent calls.
1813	pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1814		clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1815
1816		let id1 = storage.incremental_id(0).await
1817			.expect("incremental_id should succeed");
1818		let id2 = storage.incremental_id(0).await
1819			.expect("incremental_id should succeed");
1820		let id3 = storage.incremental_id(0).await
1821			.expect("incremental_id should succeed");
1822
1823		assert_eq!(id1, 1, "first id should be 1");
1824		assert_eq!(id2, 2, "second id should be 2");
1825		assert_eq!(id3, 3, "third id should be 3");
1826	}
1827
1828	/// Tests that incremental_id maintains separate sequences for different partitions.
1829	pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1830		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1831
1832		// Generate IDs for partition A
1833		let a1 = storage.incremental_id(0).await
1834			.expect("incremental_id should succeed");
1835		let a2 = storage.incremental_id(0).await
1836			.expect("incremental_id should succeed");
1837		let a3 = storage.incremental_id(0).await
1838			.expect("incremental_id should succeed");
1839
1840		// Generate IDs for partition B
1841		let b1 = storage.incremental_id(1).await
1842			.expect("incremental_id should succeed");
1843		let b2 = storage.incremental_id(1).await
1844			.expect("incremental_id should succeed");
1845
1846		// Partition A should have 1, 2, 3
1847		assert_eq!(a1, 1);
1848		assert_eq!(a2, 2);
1849		assert_eq!(a3, 3);
1850
1851		// Partition B should have its own sequence starting at 1
1852		assert_eq!(b1, 1);
1853		assert_eq!(b2, 2);
1854
1855		// Generate more for A - should continue from 3
1856		let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1857		assert_eq!(a4, 4);
1858	}
1859
1860	/// Tests that incremental_id persists its state correctly.
1861	pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1862		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1863
1864		// Generate some IDs
1865		let id1 = storage.incremental_id(0).await
1866			.expect("incremental_id should succeed");
1867		let id2 = storage.incremental_id(0).await
1868			.expect("incremental_id should succeed");
1869		assert_eq!(id1, 1);
1870		assert_eq!(id2, 2);
1871
1872		// Verify the stored value can be retrieved directly
1873		let stored = storage
1874			.get(LAST_IDS, &[0])
1875			.await
1876			.expect("get should succeed")
1877			.expect("id record should exist");
1878		let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1879		assert_eq!(stored_id, 2, "stored id should be 2");
1880
1881		// Continue generating - should pick up where we left off
1882		let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1883		assert_eq!(id3, 3);
1884	}
1885
1886	/// Tests that get_all returns empty results for empty partition.
1887	pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1888		clear_partitions(storage, &[0]).await.unwrap();
1889		let results = storage
1890			.get_all(0)
1891			.await
1892			.expect("get_all should succeed");
1893
1894		assert!(results.is_empty(), "get_all should return empty for empty partition");
1895	}
1896
1897	/// Tests that get_all returns all records in a partition.
1898	pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1899		clear_partitions(storage, &[0]).await.unwrap();
1900
1901		// Insert records with sort keys
1902		for i in 0..5 {
1903			let record = Record {
1904				pk: format!("get_all_{}", i).into(),
1905				partition: 0,
1906				sort_key: Some(SortKey::u32_asc(i)),
1907				data: format!("record_{}", i).as_bytes().to_vec(),
1908			};
1909			storage.put(record).await.expect("put should succeed");
1910		}
1911
1912		let results = storage
1913			.get_all(0)
1914			.await
1915			.expect("get_all should succeed");
1916
1917		assert_eq!(results.len(), 5, "get_all should return all 5 records");
1918
1919		// Verify all records are present (order not guaranteed)
1920		for i in 0..5 {
1921			let expected_data = format!("record_{}", i).as_bytes().to_vec();
1922			let found = results.iter().any(|r| r.data == expected_data);
1923			assert!(found, "get_all should include record_{}", i);
1924		}
1925	}
1926
1927	/// Tests that get_all includes records without sort keys.
1928	pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1929		clear_partitions(storage, &[0]).await.unwrap();
1930
1931		// Insert records with sort keys
1932		let with_key_1 = Record {
1933			pk: "get_all_with_1".into(),
1934			partition: 0,
1935			sort_key: Some(SortKey::u32_asc(1)),
1936			data: b"with_key_1".to_vec(),
1937		};
1938		let with_key_2 = Record {
1939			pk: "get_all_with_2".into(),
1940			partition: 0,
1941			sort_key: Some(SortKey::u32_asc(2)),
1942			data: b"with_key_2".to_vec(),
1943		};
1944
1945		// Insert records without sort keys
1946		let without_key_1 = Record {
1947			pk: "get_all_without_1".into(),
1948			partition: 0,
1949			sort_key: None,
1950			data: b"without_key_1".to_vec(),
1951		};
1952		let without_key_2 = Record {
1953			pk: "get_all_without_2".into(),
1954			partition: 0,
1955			sort_key: None,
1956			data: b"without_key_2".to_vec(),
1957		};
1958
1959		storage.put(with_key_1).await.expect("put should succeed");
1960		storage.put(without_key_1).await.expect("put should succeed");
1961		storage.put(with_key_2).await.expect("put should succeed");
1962		storage.put(without_key_2).await.expect("put should succeed");
1963
1964		let results = storage
1965			.get_all(0)
1966			.await
1967			.expect("get_all should succeed");
1968
1969		assert_eq!(results.len(), 4, "get_all should return all 4 records");
1970
1971		// Verify all records are present
1972		let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
1973		let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
1974		let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
1975		let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
1976
1977		assert!(has_with_1, "get_all should include with_key_1");
1978		assert!(has_with_2, "get_all should include with_key_2");
1979		assert!(has_without_1, "get_all should include without_key_1");
1980		assert!(has_without_2, "get_all should include without_key_2");
1981
1982		// Verify that query excludes records without sort keys
1983		let query_results = storage
1984			.query_sorted(Query::new_full_range(0))
1985			.await
1986			.expect("query should succeed");
1987
1988		assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
1989		let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
1990		assert!(!query_has_without, "query should not include records without sort keys");
1991	}
1992
1993	/// Tests that get_all only returns records from the specified partition.
1994	pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1995		clear_partitions(storage, &[0, 1]).await.unwrap();
1996
1997		// Insert records in partition 0
1998		for i in 0..3 {
1999			let record = Record {
2000				pk: format!("partition_0_{}", i).into(),
2001				partition: 0,
2002				sort_key: Some(SortKey::u32_asc(i)),
2003				data: format!("p0_record_{}", i).as_bytes().to_vec(),
2004			};
2005			storage.put(record).await.expect("put should succeed");
2006		}
2007
2008		// Insert records in partition 1
2009		for i in 0..2 {
2010			let record = Record {
2011				pk: format!("partition_1_{}", i).into(),
2012				partition: 1,
2013				sort_key: Some(SortKey::u32_asc(i)),
2014				data: format!("p1_record_{}", i).as_bytes().to_vec(),
2015			};
2016			storage.put(record).await.expect("put should succeed");
2017		}
2018
2019		// get_all partition 0
2020		let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2021		assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2022		assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2023
2024		// get_all partition 1
2025		let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2026		assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2027		assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2028	}
2029
2030	/// Tests that get_all reflects deletions.
2031	pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2032		clear_partitions(storage, &[0]).await.unwrap();
2033
2034		for i in 0..3u32 {
2035			let record = Record {
2036				pk: format!("get_all_del_{}", i).into(),
2037				partition: 0,
2038				sort_key: Some(SortKey::u32_asc(i)),
2039				data: format!("record_{}", i).as_bytes().to_vec(),
2040			};
2041			storage.put(record).await.expect("put should succeed");
2042		}
2043
2044		storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2045
2046		let results = storage.get_all(0).await.expect("get_all should succeed");
2047		assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2048
2049		let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2050		assert!(!has_deleted, "deleted record should not appear");
2051	}
2052}