Skip to main content

bark/persist/adaptor/
mod.rs

1//! Storage adaptor module providing the [StorageAdaptor] trait and blanket
2//! implementation of [BarkPersister] for any type implementing [StorageAdaptor].
3//!
4//! This module provides an optimized single-table storage abstraction that can be
5//! efficiently implemented on various backends (SQLite, Postgres, MongoDB, Firebase,
6//! in-memory, etc.).
7//!
8//! The design uses structured keys:
9//! - **Primary key (`pk`)**: Unique identifier for each record
10//! - **Partition key**: Groups related records for efficient querying
11//! - **Sort key**: Enables ordered iteration and range queries
12//!
13//! # Example
14//!
15//! ```rust
16//! # use bark::persist::adaptor::memory::MemoryStorageAdaptor;
17//! # use bark::persist::adaptor::{Query, Record, StorageAdaptor, SortKey};
18//!
19//! # async fn example() -> anyhow::Result<()> {
20//! // Create an in-memory storage adaptor
21//! let mut storage = MemoryStorageAdaptor::new();
22//!
23//! // Store a record sorted by a numeric field (ascending)
24//! let record = Record {
25//!     partition: 0,
26//!     pk: "item:1".into(),
27//!     sort_key: Some(SortKey::u32_asc(42)),
28//!     data: b"hello world".to_vec(),
29//! };
30//! storage.put(record).await?;
31//!
32//! // Query with efficient index scan
33//! let query = Query::new_full_range(0).limit(10);
34//! let records = storage.query_sorted(query).await?;
35//! # Ok(())
36//! # }
37//! ```
38
39mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::{SortKey, SortKeyBuilder};
45
46#[cfg(feature = "indexed-db")]
47pub mod indexed_db;
48
49use std::ops::RangeBounds;
50
51use anyhow::Context;
52use bitcoin::{Amount, Transaction, Txid};
53use bitcoin::secp256k1::PublicKey;
54use bitcoin::hashes::Hash;
55#[cfg(feature = "onchain-bdk")]
56use bdk_core::Merge;
57#[cfg(feature = "onchain-bdk")]
58use bdk_wallet::ChangeSet;
59use chrono::{DateTime, Local};
60use lightning_invoice::Bolt11Invoice;
61use serde::{de::DeserializeOwned, Serialize};
62
63use ark::lightning::{PaymentHash, Preimage};
64use ark::{Vtxo, VtxoId};
65use ark::vtxo::Full;
66use bitcoin_ext::BlockDelta;
67
68use crate::actions::{WalletActionCheckpoint, WalletActionId};
69use crate::exit::ExitTxOrigin;
70use crate::movement::{
71	Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
72};
73use crate::persist::BarkPersister;
74use crate::persist::models::{
75	LightningReceive, PaidInvoice, PendingBoard, PendingOffboard,
76	RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
77	StoredRoundState, Unlocked, wallet_vtxo_from_full,
78};
79use crate::round::RoundState;
80use crate::vtxo::{VtxoState, VtxoStateKind};
81use crate::{WalletProperties, WalletVtxo};
82
83
84pub mod partition {
85	pub const PROPERTIES: u8 = 0;
86	#[allow(unused)]
87	pub const BDK_CHANGESET: u8 = 1;
88	pub const VTXO: u8 = 2;
89	pub const PUBLIC_KEY: u8 = 3;
90	pub const PENDING_BOARD: u8 = 4;
91	pub const ROUND_STATE: u8 = 5;
92	pub const MOVEMENT: u8 = 6;
93	/// was used by the now-removed `bark_lightning_send`. Do not reuse.
94	#[allow(unused)]
95	pub const LEGACY_LIGHTNING_SEND: u8 = 7;
96	pub const LIGHTNING_RECEIVE: u8 = 8;
97	pub const EXIT_VTXO: u8 = 9;
98	pub const EXIT_CHILD_TX: u8 = 10;
99	pub const MAILBOX_CHECKPOINT: u8 = 11;
100	pub const PENDING_OFFBOARD: u8 = 12;
101	/// An index table for payment method to movement
102	pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
103	/// Per-action checkpoint payloads keyed by [WalletActionId].
104	pub const WALLET_ACTION_CHECKPOINT: u8 = 14;
105	/// Permanent record of every settled outgoing lightning payment,
106	/// keyed by payment hash.
107	pub const PAID_INVOICE: u8 = 15;
108
109	pub const LAST_IDS: u8 = u8::MAX;
110}
111
112/// A storage record with structured keys.
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub struct Record {
115	/// Partition key for grouping related records (e.g., "vtxo", "movement").
116	///
117	/// Queries always filter by partition.
118	pub partition: u8,
119
120	/// Unique primary key
121	pub pk: Vec<u8>,
122
123	/// Optional sort key for ordered iteration within a partition.
124	///
125	/// Use [`SortKey::builder()`] to construct composite keys with
126	/// mixed sort directions.
127	///
128	/// This field may be set or changed after record insertion.
129	/// Implementation should support updating the sort key of a
130	/// record post-insert if needed.
131	pub sort_key: Option<SortKey>,
132
133	/// The record data encoded as JSON.
134	pub data: Vec<u8>,
135}
136
137impl Record {
138	/// Converts the record data to a typed value.
139	fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
140		serde_json::from_slice(&self.data).map_err(Into::into)
141	}
142
143	/// Creates a new record from a typed value.
144	fn from_data<T: Serialize>(
145		partition: u8,
146		pk: &[u8],
147		sort_key: Option<SortKey>,
148		data: &T,
149	) -> anyhow::Result<Record> {
150		Ok(Record {
151			partition,
152			pk: pk.to_vec(),
153			sort_key,
154			data: serde_json::to_vec(data)?,
155		})
156	}
157}
158
159/// A range of sort keys.
160pub trait QueryRange: RangeBounds<SortKey> + Send {}
161
162impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
163
164/// Query specification for retrieving sorted records from a partition.
165#[derive(Debug, Clone)]
166pub struct Query<R: QueryRange> {
167	/// Partition to query (required).
168	pub partition: u8,
169
170	/// Range of sort keys to query.
171	pub range: R,
172
173	/// Maximum number of records to return.
174	pub limit: Option<usize>,
175}
176
177impl<R: QueryRange> Query<R> {
178	/// Creates a new query for the given partition.
179	pub fn new(partition: u8, range: R) -> Self {
180		Self {
181			partition,
182			range,
183			limit: None,
184		}
185	}
186
187	/// Sets the maximum number of records to return.
188	///
189	/// If the range is greater than the limit, the query will
190	/// return the first `limit` records.
191	pub fn limit(mut self, limit: usize) -> Self {
192		self.limit = Some(limit);
193		self
194	}
195}
196
197impl Query<std::ops::RangeFull> {
198	pub fn new_full_range(partition: u8) -> Self {
199		Self::new(partition, ..)
200	}
201}
202
203fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
204	let body = pm.value_string();
205
206	let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
207	buf.extend(pm.type_str().as_bytes().iter().copied());
208	// nb we need to separate with a non-utf8 byte
209	// we use 0xfe so that we can easily query on prefix from <type>0xfe .. <type>0xff
210	buf.push(0xfe);
211	buf.extend(body.into_bytes());
212	buf
213}
214
215/// Storage adaptor trait for persistence backends.
216///
217/// This trait provides a minimal interface (5 methods) that can be efficiently
218/// implemented on various storage backends while enabling query optimization.
219///
220/// # Implementor's Guide
221///
222/// ## Simple backends (memory, file-based)
223///
224/// Store records in a map/list keyed by `(partition, pk)`.
225///
226/// - `query_sorted`: query sorted records by partition, excluding those without
227///	  a sort key defined.
228/// - `get_all`: return every record in the partition regardless of whether it
229///   has a sort key. No ordering is guaranteed.
230///
231/// ## Database backends (Postgres, MongoDB, Firebase, IndexedDB, etc.)
232///
233/// Create a single table with indexes:
234///
235/// ```sql
236/// CREATE TABLE storage (
237///     pk TEXT PRIMARY KEY,
238///     partition TEXT NOT NULL,
239///     sort_key BLOB,
240///     data BLOB NOT NULL
241/// );
242/// CREATE INDEX idx_partition_sort ON storage(partition, sort_key);
243/// ```
244///
245/// Translate [`Query`] to SQL:
246///
247/// ```sql
248/// -- query_sorted: only returns records with a sort key
249/// SELECT * FROM storage
250/// WHERE partition = :partition AND sort_key IS NOT NULL
251/// ORDER BY sort_key DESC
252/// ```
253///
254/// Translate [`get_all`](StorageAdaptor::get_all) to SQL:
255///
256/// ```sql
257/// -- get_all: returns all records in the partition, no ordering
258/// SELECT * FROM storage
259/// WHERE partition = :partition
260/// ```
261#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
262#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
263pub trait StorageAdaptor: Send + Sync + 'static {
264	/// Stores a record, inserting or updating by primary key.
265	async fn put(&mut self, record: Record) -> anyhow::Result<()>;
266
267	/// Retrieves a record by primary key.
268	///
269	/// Returns `None` if the record doesn't exist.
270	async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
271
272	/// Deletes a record by primary key.
273	///
274	/// Returns the deleted record if it existed, `None` otherwise.
275	async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
276
277	/// Queries sorted records from a partition.
278	///
279	/// Results are ordered by sort key.
280	/// Records without a sort key must not be returned, to query all
281	/// records use [`StorageAdaptor::get_all`].
282	async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
283		-> anyhow::Result<Vec<Record>>;
284
285	/// Get all records in a partition.
286	///
287	/// No ordering guarantees are made.
288	async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
289
290	/// Increments the last partition id, then stores and returns the new id.
291	async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
292		let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
293			.map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
294		let next_partition_id = last_partition_id + 1;
295
296		let record = Record::from_data(
297			partition::LAST_IDS,
298			&[partition],
299			None,
300			&next_partition_id,
301		)?;
302
303		self.put(record).await?;
304		Ok(next_partition_id)
305	}
306}
307
308async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
309	match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
310		Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
311		None => Ok(None),
312	}
313}
314
315async fn get_check_vtxo_state<S: StorageAdaptor>(
316	adaptor: &S,
317	vtxo_id: VtxoId,
318	allowed_states: &[VtxoStateKind],
319) -> anyhow::Result<SerdeVtxo> {
320	let vtxo = get_vtxo(adaptor, vtxo_id).await?
321		.context("vtxo not found")?;
322
323	let current_state = vtxo.current_state().context("vtxo has no state")?;
324	if !allowed_states.contains(&current_state.kind()) {
325		bail!("current state {:?} not in allowed states {:?}",
326			current_state.kind(), allowed_states
327		);
328	}
329
330	Ok(vtxo)
331}
332
333async fn update_vtxo_state_checked<S: StorageAdaptor>(
334	adaptor: &mut S,
335	vtxo_id: VtxoId,
336	new_state: VtxoState,
337	allowed_old_states: &[VtxoStateKind],
338) -> anyhow::Result<WalletVtxo> {
339	let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
340
341	let sk = sort::vtxo_sort_key(
342		new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
343	);
344
345	serde_vtxo.states.push(new_state.clone());
346	let updated_record = Record::from_data(
347		partition::VTXO,
348		&vtxo_id.to_bytes(),
349		Some(sk),
350		&serde_vtxo,
351	)?;
352
353	adaptor.put(updated_record).await?;
354
355	Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, new_state))
356}
357
358pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
359	inner: tokio::sync::RwLock<S>,
360}
361
362impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
363	pub fn new(inner: S) -> Self {
364		Self {
365			inner: tokio::sync::RwLock::new(inner),
366		}
367	}
368}
369
370/// Blanket implementation of `BarkPersister` for any type implementing `StorageAdaptor`.
371#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
372#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
373impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
374	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
375		let record = Record::from_data(
376			partition::PROPERTIES,
377			// NB: a single set of properties is stored, so no need for primary key
378			&[],
379			None,
380			properties,
381		)?;
382		self.inner.write().await.put(record).await
383	}
384
385	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
386		match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
387			Some(record) => Ok(Some(record.to_data()?)),
388			None => Ok(None),
389		}
390	}
391
392	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
393		let mut properties = match self.read_properties().await? {
394			Some(properties) => properties,
395			None => bail!("wallet not initialized"),
396		};
397
398		properties.server_pubkey = Some(server_pubkey);
399
400		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
401		self.inner.write().await.put(record).await
402	}
403
404	async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
405		let mut properties = match self.read_properties().await? {
406			Some(properties) => properties,
407			None => bail!("wallet not initialized"),
408		};
409
410		properties.server_mailbox_pubkey = Some(server_mailbox_pubkey);
411
412		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
413		self.inner.write().await.put(record).await
414	}
415
416	#[cfg(feature = "onchain-bdk")]
417	async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
418		match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
419			Some(record) => record.to_data(),
420			None => Ok(ChangeSet::default()),
421		}
422	}
423
424	#[cfg(feature = "onchain-bdk")]
425	async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
426		let mut current = self.initialize_bdk_wallet().await?;
427		current.merge(changeset.clone());
428
429		let record = Record::from_data(
430			partition::BDK_CHANGESET,
431			// NB: a single changeset is stored, so no need for primary key
432			&[],
433			None,
434			&current,
435		)?;
436		self.inner.write().await.put(record).await
437	}
438
439	async fn create_new_movement(
440		&self,
441		status: MovementStatus,
442		subsystem: &MovementSubsystem,
443		time: DateTime<Local>,
444	) -> anyhow::Result<MovementId> {
445		let mut lock = self.inner.write().await;
446
447		let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
448		let movement = Movement::new(id, status, subsystem, time);
449
450		let record = Record::from_data(
451			partition::MOVEMENT,
452			&id.to_bytes(),
453			Some(sort::movement_sort_key(&time)),
454			&movement,
455		)?;
456		lock.put(record).await?;
457
458		Ok(id)
459	}
460
461	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
462		let mut guard = self.inner.write().await;
463
464		let record = Record::from_data(
465			partition::MOVEMENT,
466			&movement.id.to_bytes(),
467			Some(sort::movement_sort_key(&movement.time.created_at)),
468			movement,
469		)?;
470		guard.put(record).await?;
471
472		// then add records for each payment method
473		let sent = movement.sent_to.iter().map(|d| &d.destination);
474		let rcvd = movement.received_on.iter().map(|d| &d.destination);
475		for pm in sent.chain(rcvd) {
476			let pm_bytes = serialize_payment_method(pm);
477			let primary_key = {
478				// We just need a unique key, but we will never query using this
479				let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
480				buf.extend(pm_bytes.iter().copied());
481				buf.extend(movement.id.to_bytes());
482				buf
483			};
484			let record = Record::from_data(
485				partition::MOVEMENT_PAYMENT_METHOD,
486				&primary_key,
487				Some(SortKey::from_bytes(pm_bytes)),
488				&movement.id.0,
489			)?;
490			guard.put(record).await?;
491		}
492
493		Ok(())
494	}
495
496	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
497		self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
498			.await?
499			.context("movement not found")?
500			.to_data()
501	}
502
503	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
504		let records = self.inner.read().await
505			.query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
506		records.into_iter().map(|r| r.to_data()).collect()
507	}
508
509	async fn get_movements_by_payment_method(
510		&self,
511		payment_method: &PaymentMethod,
512	) -> anyhow::Result<Vec<Movement>> {
513		let pm_bytes = serialize_payment_method(payment_method);
514		let sort_key = SortKey::from_bytes(pm_bytes);
515
516		let guard = self.inner.read().await;
517		let idx_recs = guard.query_sorted(Query::new(
518			partition::MOVEMENT_PAYMENT_METHOD,
519			sort_key.clone()..=sort_key,
520		)).await?;
521
522		let mut ret = Vec::with_capacity(idx_recs.len());
523		for idx_rec in idx_recs {
524			let id = MovementId::new(idx_rec.to_data::<u32>()
525				.context("corrupt db: movement payment method index value")?);
526
527			ret.push(
528				guard.get(partition::MOVEMENT, &id.to_bytes()).await?
529					.context("corrupt db: movement payment method entry for unknown movement")?
530					.to_data()
531					.context("corrupt db: invalid movement record")?
532			);
533		}
534		Ok(ret)
535	}
536
537	async fn store_pending_board(
538		&self,
539		vtxo: &Vtxo<Full>,
540		funding_tx: &Transaction,
541		movement_id: MovementId,
542	) -> anyhow::Result<()> {
543		let pending_board = PendingBoard {
544			vtxos: vec![vtxo.id()],
545			amount: vtxo.amount(),
546			funding_tx: funding_tx.clone(),
547			movement_id,
548		};
549
550		let record = Record::from_data(
551			partition::PENDING_BOARD,
552			&vtxo.id().to_bytes(),
553			None,
554			&pending_board,
555		)?;
556
557		self.inner.write().await.put(record).await
558	}
559
560	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
561		self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
562		Ok(())
563	}
564
565	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
566		let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
567		records
568			.into_iter()
569			.map(|r| {
570				let board = r.to_data::<PendingBoard>()?;
571				Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
572			})
573			.collect()
574	}
575
576	async fn get_pending_board_by_vtxo_id(
577		&self,
578		vtxo_id: VtxoId,
579	) -> anyhow::Result<Option<PendingBoard>> {
580		match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
581			Some(record) => Ok(Some(record.to_data()?)),
582			None => Ok(None),
583		}
584	}
585
586	async fn store_round_state(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
587		let mut lock = self.inner.write().await;
588
589		let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
590		let serde_state = SerdeRoundState::from(round_state);
591		let record = Record::from_data(
592			partition::ROUND_STATE,
593			&id.to_bytes(),
594			Some(sort::SortKey::u32_asc(id.0)),
595			&serde_state,
596		)?;
597		lock.put(record).await?;
598
599		Ok(id)
600	}
601
602	async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
603		let serde_state = SerdeRoundState::from(round_state.state());
604		let record = Record::from_data(
605			partition::ROUND_STATE,
606			&round_state.id().to_bytes(),
607			Some(sort::SortKey::u32_asc(round_state.id().0)),
608			&serde_state,
609		)?;
610		self.inner.write().await.put(record).await
611	}
612
613	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
614		self.inner.write().await
615			.delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
616		Ok(())
617	}
618
619	async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
620		let record = self.inner.read().await
621			.get(partition::ROUND_STATE, &_id.to_bytes()).await?;
622		match record {
623			Some(r) => {
624				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
625				let id = RoundStateId(u32::from_be_bytes(pk_slice));
626				let state = r.to_data::<SerdeRoundState>()?.into();
627				Ok(Some(StoredRoundState::new(id, state)))
628			},
629			None => Ok(None),
630		}
631	}
632
633	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
634		let records = self.inner.read().await
635			.get_all(partition::ROUND_STATE).await?;
636		records.into_iter()
637			.map(|r| {
638				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
639				Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
640			})
641			.collect()
642	}
643
644	async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
645		let mut lock = self.inner.write().await;
646
647		for (vtxo, state) in vtxos {
648			let serde_vtxo = SerdeVtxo {
649				vtxo: (*vtxo).clone(),
650				states: vec![(*state).clone()],
651			};
652
653			let sk = sort::vtxo_sort_key(
654				state.kind(), vtxo.expiry_height(), vtxo.amount(),
655			);
656			let record = Record::from_data(
657				partition::VTXO,
658				&vtxo.id().to_bytes(),
659				Some(sk),
660				&serde_vtxo,
661			)?;
662			lock.put(record).await?;
663		}
664		Ok(())
665	}
666
667	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
668		let lock = self.inner.read().await;
669		match get_vtxo(&*lock, id).await? {
670			Some(serde_vtxo) => {
671				let state = serde_vtxo.current_state()
672					.context("vtxo has no state")?.clone();
673				Ok(Some(wallet_vtxo_from_full(&serde_vtxo.vtxo, state)))
674			},
675			None => Ok(None),
676		}
677	}
678
679	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
680		let records = self.inner.read().await
681			.query_sorted(Query::new_full_range(partition::VTXO)).await?;
682
683		records
684			.into_iter()
685			.map(|r| {
686				let serde_vtxo = r.to_data::<SerdeVtxo>()?;
687				let state = serde_vtxo
688					.current_state()
689					.cloned()
690					.context("vtxo has no state")?;
691				Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, state))
692			})
693			.collect()
694	}
695
696	async fn get_vtxos_by_state(
697		&self,
698		states: &[VtxoStateKind],
699	) -> anyhow::Result<Vec<WalletVtxo>> {
700		let lock = self.inner.read().await;
701
702		let range = |state: VtxoStateKind| {
703			let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
704			let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
705			(start, end)
706		};
707
708		let mut records = Vec::new();
709		for state in states {
710			let (start, end) = range(*state);
711			let query = Query::new(partition::VTXO, start..=end);
712
713			for record in lock.query_sorted(query).await? {
714				let serde_vtxo = record.to_data::<SerdeVtxo>()?;
715				let current_state = serde_vtxo.current_state()
716					.context("vtxo has no current state")?.clone();
717				debug_assert_eq!(current_state.kind(), *state);
718				records.push(wallet_vtxo_from_full(&serde_vtxo.vtxo, current_state));
719			}
720		}
721
722		Ok(records)
723	}
724
725	async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
726		let lock = self.inner.read().await;
727		Ok(get_vtxo(&*lock, id).await?.map(|s| s.vtxo))
728	}
729
730	async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
731		let lock = self.inner.read().await;
732		let mut out = Vec::with_capacity(ids.len());
733		for id in ids {
734			let serde_vtxo = get_vtxo(&*lock, *id).await?
735				.with_context(|| format!("vtxo {id} not found"))?;
736			out.push(serde_vtxo.vtxo);
737		}
738		Ok(out)
739	}
740
741	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
742		match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
743			Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
744			None => Ok(None),
745		}
746	}
747
748	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
749		match self.get_wallet_vtxo(id).await? {
750			Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
751			None => Ok(false),
752		}
753	}
754
755	async fn update_vtxo_state_checked(
756		&self,
757		vtxo_id: VtxoId,
758		new_state: VtxoState,
759		allowed_old_states: &[VtxoStateKind],
760	) -> anyhow::Result<WalletVtxo> {
761		let mut lock = self.inner.write().await;
762		update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
763	}
764
765	async fn update_vtxo_states_checked(
766		&self,
767		vtxo_ids: &[VtxoId],
768		new_state: VtxoState,
769		allowed_old_states: &[VtxoStateKind],
770	) -> anyhow::Result<()> {
771		let mut lock = self.inner.write().await;
772		// Validate every vtxo before mutating anything, so a state-kind
773		// mismatch can't leave the batch half-applied. Concurrent batches
774		// are serialized by the write lock above. Storage errors during
775		// the put loop cannot be rolled back from here — that is a
776		// fundamental limitation of the adaptor.
777		for id in vtxo_ids {
778			get_check_vtxo_state(&*lock, *id, allowed_old_states).await?;
779		}
780		for id in vtxo_ids {
781			update_vtxo_state_checked(&mut *lock, *id, new_state.clone(), allowed_old_states).await?;
782		}
783		Ok(())
784	}
785
786	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
787		let vtxo_key = SerdeVtxoKey { index, public_key };
788		let record = Record::from_data(
789			partition::PUBLIC_KEY,
790			&public_key.serialize()[..],
791			Some(sort::SortKey::u64_desc(index as u64)),
792			&vtxo_key,
793		)?;
794		self.inner.write().await.put(record).await
795	}
796
797	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
798		// pks are sorted descending, so the first one is the highest index
799		let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
800		let records = self.inner.read().await.query_sorted(query).await?;
801
802		match records.into_iter().next() {
803			Some(record) => {
804				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
805				Ok(Some(vtxo_key.index))
806			}
807			None => Ok(None),
808		}
809	}
810
811	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
812		match self.inner.read().await
813			.get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
814		{
815			Some(record) => {
816				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
817				Ok(Some(vtxo_key.index))
818			}
819			None => Ok(None),
820		}
821	}
822
823	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
824		match self.inner.read().await
825			.get(partition::MAILBOX_CHECKPOINT, &[]).await?
826		{
827			Some(record) => Ok(record.to_data::<u64>()?),
828			None => Ok(0),
829		}
830	}
831
832	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
833		let mut lock = self.inner.write().await;
834		let record = Record::from_data(
835			partition::MAILBOX_CHECKPOINT,
836			&[],
837			None,
838			&checkpoint,
839		)?;
840		lock.put(record).await?;
841		Ok(())
842	}
843
844	async fn upsert_wallet_action_checkpoint(
845		&self,
846		id: &WalletActionId,
847		checkpoint: &WalletActionCheckpoint,
848	) -> anyhow::Result<()> {
849		let record = Record::from_data(
850			partition::WALLET_ACTION_CHECKPOINT,
851			id.as_bytes(),
852			None,
853			checkpoint,
854		)?;
855		self.inner.write().await.put(record).await
856	}
857
858	async fn get_wallet_action_checkpoint(
859		&self,
860		id: &WalletActionId,
861	) -> anyhow::Result<Option<WalletActionCheckpoint>> {
862		match self.inner.read().await
863			.get(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?
864		{
865			Some(record) => Ok(Some(record.to_data()?)),
866			None => Ok(None),
867		}
868	}
869
870	async fn get_all_wallet_action_checkpoints(
871		&self,
872	) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
873		let records = self.inner.read().await
874			.get_all(partition::WALLET_ACTION_CHECKPOINT).await?;
875		records.into_iter().map(|r| r.to_data()).collect()
876	}
877
878	async fn remove_wallet_action_checkpoint(
879		&self,
880		id: &WalletActionId,
881	) -> anyhow::Result<()> {
882		self.inner.write().await
883			.delete(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?;
884		Ok(())
885	}
886
887	async fn record_paid_invoice(
888		&self,
889		payment_hash: PaymentHash,
890		preimage: Preimage,
891	) -> anyhow::Result<()> {
892		let key = payment_hash.to_byte_array();
893		// Idempotent: preserve the original paid_at across retries.
894		let mut lock = self.inner.write().await;
895		if lock.get(partition::PAID_INVOICE, &key).await?.is_some() {
896			return Ok(());
897		}
898		let paid = PaidInvoice {
899			payment_hash,
900			preimage,
901			paid_at: chrono::Local::now(),
902		};
903		let record = Record::from_data(partition::PAID_INVOICE, &key, None, &paid)?;
904		lock.put(record).await
905	}
906
907	async fn get_paid_invoice(
908		&self,
909		payment_hash: PaymentHash,
910	) -> anyhow::Result<Option<PaidInvoice>> {
911		match self.inner.read().await
912			.get(partition::PAID_INVOICE, &payment_hash.to_byte_array()).await?
913		{
914			Some(record) => Ok(Some(record.to_data()?)),
915			None => Ok(None),
916		}
917	}
918
919	async fn store_lightning_receive(
920		&self,
921		payment_hash: PaymentHash,
922		preimage: Preimage,
923		invoice: &Bolt11Invoice,
924		htlc_recv_cltv_delta: BlockDelta,
925	) -> anyhow::Result<()> {
926		let lightning_receive = LightningReceive {
927			payment_hash,
928			payment_preimage: preimage,
929			invoice: invoice.clone(),
930			htlc_recv_cltv_delta,
931			htlc_vtxos: vec![],
932			movement_id: None,
933			finished_at: None,
934			preimage_revealed_at: None,
935		};
936
937		let record = Record::from_data(
938			partition::LIGHTNING_RECEIVE,
939			&payment_hash.to_byte_array(),
940			None,
941			&lightning_receive,
942		)?;
943		self.inner.write().await.put(record).await
944	}
945
946	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
947		let records = self.inner.read().await
948			.get_all(partition::LIGHTNING_RECEIVE).await?;
949		records
950			.into_iter()
951			.filter_map(|r| {
952				let receive = r.to_data::<LightningReceive>().ok()?;
953				if receive.finished_at.is_none() {
954					Some(Ok(receive))
955				} else {
956					None
957				}
958			})
959			.collect()
960	}
961
962	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
963		let mut lock = self.inner.write().await;
964
965		let pk = payment_hash.to_byte_array();
966		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
967			.context("lightning receive not found")?;
968		let mut lightning_receive: LightningReceive = record.to_data()?;
969
970		lightning_receive.preimage_revealed_at = Some(Local::now());
971
972		let updated_record = Record::from_data(
973			partition::LIGHTNING_RECEIVE,
974			&pk,
975			None,
976			&lightning_receive,
977		)?;
978		lock.put(updated_record).await
979	}
980
981	async fn update_lightning_receive(
982		&self,
983		payment_hash: PaymentHash,
984		vtxo_ids: &[VtxoId],
985		movement_id: MovementId,
986	) -> anyhow::Result<()> {
987		let mut lock = self.inner.write().await;
988		let pk = payment_hash.to_byte_array();
989		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
990			.context("lightning receive not found")?;
991		let mut lightning_receive: LightningReceive = record.to_data()?;
992
993		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
994		for vtxo_id in vtxo_ids {
995			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
996				.context("vtxo not found")?;
997			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
998		}
999
1000		lightning_receive.htlc_vtxos = htlc_vtxos;
1001		lightning_receive.movement_id = Some(movement_id);
1002
1003		let updated_record = Record::from_data(
1004			partition::LIGHTNING_RECEIVE,
1005			&pk,
1006			None,
1007			&lightning_receive,
1008		)?;
1009		lock.put(updated_record).await
1010	}
1011
1012	async fn fetch_lightning_receive_by_payment_hash(
1013		&self,
1014		payment_hash: PaymentHash,
1015	) -> anyhow::Result<Option<LightningReceive>> {
1016		match self.inner.read().await
1017			.get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1018		{
1019			Some(record) => Ok(Some(record.to_data()?)),
1020			None => Ok(None),
1021		}
1022	}
1023
1024	async fn finish_pending_lightning_receive(
1025		&self,
1026		payment_hash: PaymentHash,
1027	) -> anyhow::Result<()> {
1028		let mut lock = self.inner.write().await;
1029		let pk = payment_hash.to_byte_array();
1030		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1031			.context("lightning receive not found")?;
1032		let mut lightning_receive: LightningReceive = record.to_data()?;
1033
1034		lightning_receive.finished_at = Some(Local::now());
1035
1036		let updated_record = Record::from_data(
1037			partition::LIGHTNING_RECEIVE,
1038			&pk,
1039			None,
1040			&lightning_receive,
1041		)?;
1042		lock.put(updated_record).await
1043	}
1044
1045	async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1046		let record = Record::from_data(
1047			partition::PENDING_OFFBOARD,
1048			&pending.movement_id.to_bytes(),
1049			None,
1050			pending,
1051		)?;
1052		self.inner.write().await.put(record).await
1053	}
1054
1055	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1056		let records = self.inner.read().await
1057			.get_all(partition::PENDING_OFFBOARD).await?;
1058		records.into_iter().map(|r| r.to_data()).collect()
1059	}
1060
1061	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1062		self.inner.write().await
1063			.delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1064		Ok(())
1065	}
1066
1067	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1068		let record = Record::from_data(
1069			partition::EXIT_VTXO,
1070			&exit.vtxo_id.to_bytes(),
1071			None,
1072			exit,
1073		)?;
1074		self.inner.write().await.put(record).await
1075	}
1076
1077	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1078		self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1079		Ok(())
1080	}
1081
1082	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1083		let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1084		records.into_iter().map(|r| r.to_data()).collect()
1085	}
1086
1087	async fn store_exit_child_tx(
1088		&self,
1089		exit_txid: Txid,
1090		child_tx: &Transaction,
1091		origin: ExitTxOrigin,
1092	) -> anyhow::Result<()> {
1093		let exit_child = SerdeExitChildTx {
1094			child_tx: child_tx.clone(),
1095			origin,
1096		};
1097		let record = Record::from_data(
1098			partition::EXIT_CHILD_TX,
1099			&exit_txid.to_byte_array(),
1100			None,
1101			&exit_child,
1102		)?;
1103		self.inner.write().await.put(record).await
1104	}
1105
1106	async fn get_exit_child_tx(
1107		&self,
1108		exit_txid: Txid,
1109	) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1110		match self.inner.read().await
1111			.get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1112		{
1113			Some(record) => {
1114				let exit_child = record.to_data::<SerdeExitChildTx>()?;
1115				Ok(Some((exit_child.child_tx, exit_child.origin)))
1116			}
1117			None => Ok(None),
1118		}
1119	}
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124	use super::*;
1125
1126	#[test]
1127	fn storage_query_builder() {
1128		let query = Query::new_full_range(0).limit(10);
1129
1130		assert_eq!(query.partition, 0);
1131		assert_eq!(query.limit, Some(10));
1132		assert_eq!(query.range, ..);
1133	}
1134}
1135
1136/// This module provides comprehensive tests for all four methods of the
1137/// `StorageAdaptor` trait. Use these functions to validate custom implementations.
1138///
1139/// # Example
1140///
1141/// ```rust
1142/// use bark::persist::adaptor::memory::test_suite;
1143///
1144/// #[tokio::test]
1145/// async fn test_my_custom_adaptor() {
1146///     let storage = MyCustomStorageAdaptor::new();
1147///     test_suite::run_all(&storage).await;
1148/// }
1149/// ```
1150#[cfg(test)]
1151pub mod test_suite {
1152	use super::*;
1153	use super::partition::LAST_IDS;
1154	use super::sort::SortKey;
1155
1156	async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1157		for partition in partitions {
1158			let records = storage.get_all(*partition).await?;
1159			for record in records {
1160				storage.delete(record.partition, &record.pk).await?;
1161			}
1162		}
1163		Ok(())
1164	}
1165
1166	/// Runs all test suites against the given storage adaptor.
1167	pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1168		// put tests
1169		test_put_insert(storage).await;
1170		test_put_upsert(storage).await;
1171		test_put_with_sort_key(storage).await;
1172		test_put_without_sort_key(storage).await;
1173		test_put_multiple_partitions(storage).await;
1174
1175		// get tests
1176		test_get_existing(storage).await;
1177		test_get_after_update(storage).await;
1178
1179		// delete tests
1180		test_delete_existing(storage).await;
1181		test_delete_nonexistent(storage).await;
1182		test_delete_idempotent(storage).await;
1183
1184		// query tests
1185		test_query_empty_partition(storage).await;
1186		test_query_returns_partition_records(storage).await;
1187		test_query_ordering(storage).await;
1188		test_query_with_limit(storage).await;
1189		test_query_null_sort_key_excluded(storage).await;
1190		test_query_partition_isolation(storage).await;
1191		test_query_range(storage).await;
1192		test_query_exclusive_end_range(storage).await;
1193		test_query_full_range_limit_one(storage).await;
1194
1195		// get_all tests
1196		test_get_all_empty_partition(storage).await;
1197		test_get_all_returns_all_records(storage).await;
1198		test_get_all_includes_records_without_sort_key(storage).await;
1199		test_get_all_partition_isolation(storage).await;
1200		test_get_all_after_delete(storage).await;
1201
1202		// incremental_id tests
1203		test_incremental_id_starts_at_one(storage).await;
1204		test_incremental_id_increments(storage).await;
1205		test_incremental_id_partition_isolation(storage).await;
1206		test_incremental_id_persists_across_operations(storage).await;
1207	}
1208
1209	/// Tests that put inserts a new record.
1210	pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1211		let record = Record {
1212			pk: "put_insert_1".into(),
1213			partition: 0,
1214			sort_key: None,
1215			data: b"test data".to_vec(),
1216		};
1217
1218		storage.put(record).await.expect("put should succeed");
1219
1220		let retrieved = storage
1221			.get(0, b"put_insert_1")
1222			.await
1223			.expect("get should succeed")
1224			.expect("record should exist");
1225
1226		assert_eq!(retrieved.pk, b"put_insert_1");
1227		assert_eq!(retrieved.partition, 0);
1228		assert_eq!(retrieved.data, b"test data");
1229	}
1230
1231	/// Tests that put updates an existing record (upsert behavior).
1232	pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1233		let record1 = Record {
1234			pk: b"put_upsert_1".into(),
1235			partition: 0,
1236			sort_key: None,
1237			data: b"original".to_vec(),
1238		};
1239		storage.put(record1).await.expect("first put should succeed");
1240
1241		let record2 = Record {
1242			pk: "put_upsert_1".into(),
1243			partition: 0,
1244			sort_key: None,
1245			data: b"updated".to_vec(),
1246		};
1247		storage
1248			.put(record2)
1249			.await
1250			.expect("second put should succeed");
1251
1252		let retrieved = storage
1253			.get(0, b"put_upsert_1")
1254			.await
1255			.expect("get should succeed")
1256			.expect("record should exist");
1257
1258		assert_eq!(retrieved.data, b"updated", "data should be updated");
1259	}
1260
1261	/// Tests that put correctly stores the sort key.
1262	pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1263		let sort_key = SortKey::u32_asc(42);
1264		let record = Record {
1265			pk: b"put_sort_key_1".into(),
1266			partition: 0,
1267			sort_key: Some(sort_key.clone()),
1268			data: b"with sort key".to_vec(),
1269		};
1270
1271		storage.put(record).await.expect("put should succeed");
1272
1273		let retrieved = storage
1274			.get(0, b"put_sort_key_1")
1275			.await
1276			.expect("get should succeed")
1277			.expect("record should exist");
1278
1279		assert_eq!(retrieved.sort_key, Some(sort_key));
1280	}
1281
1282	/// Tests that put correctly handles records without sort keys.
1283	pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1284		let record = Record {
1285			pk: b"put_no_sort_key_1".into(),
1286			partition: 0,
1287			sort_key: None,
1288			data: b"no sort key".to_vec(),
1289		};
1290
1291		storage.put(record).await.expect("put should succeed");
1292
1293		let retrieved = storage
1294			.get(0, b"put_no_sort_key_1")
1295			.await
1296			.expect("get should succeed")
1297			.expect("record should exist");
1298
1299		assert!(retrieved.sort_key.is_none());
1300	}
1301
1302	/// Tests that put correctly handles multiple partitions.
1303	pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1304		let record_a = Record {
1305			pk: "put_multi_a".into(),
1306			partition: 0,
1307			sort_key: None,
1308			data: b"in partition a".to_vec(),
1309		};
1310		let record_b = Record {
1311			pk: "put_multi_b".into(),
1312			partition: 1,
1313			sort_key: None,
1314			data: b"in partition b".to_vec(),
1315		};
1316
1317		storage.put(record_a).await.expect("put a should succeed");
1318		storage.put(record_b).await.expect("put b should succeed");
1319
1320		let retrieved_a = storage
1321			.get(0, b"put_multi_a")
1322			.await
1323			.expect("get should succeed")
1324			.expect("record a should exist");
1325		let retrieved_b = storage
1326			.get(1, b"put_multi_b")
1327			.await
1328			.expect("get should succeed")
1329			.expect("record b should exist");
1330
1331		assert_eq!(retrieved_a.partition, 0);
1332		assert_eq!(retrieved_b.partition, 1);
1333	}
1334
1335	/// Tests that get returns an existing record.
1336	pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1337		let record = Record {
1338			pk: b"get_existing_1".into(),
1339			partition: 0,
1340			sort_key: Some(SortKey::u32_asc(100)),
1341			data: b"test".to_vec(),
1342		};
1343		storage.put(record).await.expect("put should succeed");
1344
1345		let retrieved = storage
1346			.get(0, b"get_existing_1")
1347			.await
1348			.expect("get should succeed");
1349
1350		assert!(retrieved.is_some());
1351		let retrieved = retrieved.unwrap();
1352		assert_eq!(retrieved.pk, b"get_existing_1");
1353		assert_eq!(retrieved.partition, 0);
1354		assert_eq!(retrieved.data, b"test");
1355
1356		// superset of the key
1357		assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1358		// subset of the key
1359		assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1360
1361		// non-existent key
1362		assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1363	}
1364
1365	/// Tests that get returns updated data after put.
1366	pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1367		let record1 = Record {
1368			pk: b"get_after_update_1".into(),
1369			partition: 0,
1370			sort_key: None,
1371			data: b"version1".to_vec(),
1372		};
1373		storage.put(record1).await.expect("put should succeed");
1374
1375		let record2 = Record {
1376			pk: b"get_after_update_1".into(),
1377			partition: 0,
1378			sort_key: None,
1379			data: b"version2".to_vec(),
1380		};
1381		storage.put(record2).await.expect("put should succeed");
1382
1383		let retrieved = storage
1384			.get(0, b"get_after_update_1")
1385			.await
1386			.expect("get should succeed")
1387			.expect("record should exist");
1388
1389		assert_eq!(retrieved.data, b"version2");
1390	}
1391
1392	/// Tests that delete removes an existing record and returns it.
1393	pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1394		let record = Record {
1395			pk: b"delete_existing_1".into(),
1396			partition: 0,
1397			sort_key: None,
1398			data: b"to delete".to_vec(),
1399		};
1400		storage.put(record.clone()).await.expect("put should succeed");
1401
1402		let deleted_record = storage
1403			.delete(0, b"delete_existing_1")
1404			.await
1405			.expect("delete should succeed");
1406
1407		assert_eq!(deleted_record, Some(record));
1408
1409		let retrieved = storage
1410			.get(0, b"delete_existing_1")
1411			.await
1412			.expect("get should succeed");
1413		assert!(retrieved.is_none(), "record should no longer exist");
1414	}
1415
1416	/// Tests that delete returns None for non-existent records.
1417	pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1418		let deleted_record = storage
1419			.delete(0, b"delete_nonexistent_does_not_exist")
1420			.await
1421			.expect("delete should succeed");
1422
1423		assert!(
1424			deleted_record.is_none(),
1425			"delete should return None for non-existent record"
1426		);
1427	}
1428
1429	/// Tests that delete is idempotent (second delete returns None).
1430	pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1431		let record = Record {
1432			pk: b"delete_idempotent_1".into(),
1433			partition: 0,
1434			sort_key: None,
1435			data: b"delete twice".to_vec(),
1436		};
1437		storage.put(record.clone()).await.expect("put should succeed");
1438
1439		let first_delete = storage
1440			.delete(0, b"delete_idempotent_1")
1441			.await
1442			.expect("first delete should succeed");
1443		let second_delete = storage
1444			.delete(0, b"delete_idempotent_1")
1445			.await
1446			.expect("second delete should succeed");
1447
1448		assert_eq!(first_delete, Some(record), "first delete should return the record");
1449		assert_eq!(second_delete, None, "second delete should return None");
1450	}
1451
1452	/// Tests that query returns empty results for empty partition.
1453	pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1454		clear_partitions(storage, &[0]).await.unwrap();
1455		let results = storage
1456			.query_sorted(Query::new_full_range(0))
1457			.await
1458			.expect("query should succeed");
1459
1460		assert!(results.is_empty());
1461	}
1462
1463	/// Tests that query returns all records in a partition.
1464	pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1465		clear_partitions(storage, &[0]).await.unwrap();
1466		for i in 0..3 {
1467			let record = Record {
1468				pk: format!("query_partition_{}", i).into(),
1469				partition: 0,
1470				sort_key: Some(SortKey::u32_asc(i)),
1471				data: format!("record_{}", i).as_bytes().to_vec(),
1472			};
1473			storage.put(record).await.expect("put should succeed");
1474		}
1475
1476		let results = storage
1477			.query_sorted(Query::new_full_range(0))
1478			.await
1479			.expect("query should succeed");
1480
1481		assert_eq!(results.len(), 3);
1482	}
1483
1484	/// Tests that query returns records in ascending sort key order by default.
1485	pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1486		clear_partitions(storage, &[0]).await.unwrap();
1487		// Insert in non-sequential order
1488		for i in [5, 2, 8, 1, 9] {
1489			let record = Record {
1490				pk: format!("query_asc_{}", i).into(),
1491				partition: 0,
1492				sort_key: Some(SortKey::u32_asc(i)),
1493				data: format!("record_{}", i).as_bytes().to_vec(),
1494			};
1495			storage.put(record).await.expect("put should succeed");
1496		}
1497
1498		let results = storage
1499			.query_sorted(Query::new_full_range(0))
1500			.await
1501			.expect("query should succeed");
1502
1503		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1504		assert_eq!(
1505			values,
1506			vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1507			"should be in ascending order"
1508		);
1509	}
1510
1511	/// Tests that query with limit returns at most N records.
1512	pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1513		clear_partitions(storage, &[0]).await.unwrap();
1514		for i in 0..10 {
1515			let record = Record {
1516				pk: format!("query_limit_{}", i).into(),
1517				partition: 0,
1518				sort_key: Some(SortKey::u32_asc(i)),
1519				data: format!("record_{}", i).as_bytes().to_vec(),
1520			};
1521			storage.put(record).await.expect("put should succeed");
1522		}
1523
1524		let results = storage
1525			.query_sorted(Query::new_full_range(0).limit(3))
1526			.await
1527			.expect("query should succeed");
1528
1529		assert_eq!(results.len(), 3);
1530		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1531		assert_eq!(
1532			values,
1533			vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1534			"should return first 3 records"
1535		);
1536	}
1537
1538	/// Tests that records without sort keys don't appear in query
1539	pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1540		clear_partitions(storage, &[0]).await.unwrap();
1541		// Records with sort keys
1542		let with_key_1 = Record {
1543			pk: "query_null_with_1".into(),
1544			partition: 0,
1545			sort_key: Some(SortKey::u32_asc(1)),
1546			data: b"with_key_1".to_vec(),
1547		};
1548		let with_key_2 = Record {
1549			pk: "query_null_with_2".into(),
1550			partition: 0,
1551			sort_key: Some(SortKey::u32_asc(2)),
1552			data: b"with_key_2".to_vec(),
1553		};
1554
1555		// Record without sort key
1556		let without_key = Record {
1557			pk: "query_null_without".into(),
1558			partition: 0,
1559			sort_key: None,
1560			data: b"no_key".to_vec(),
1561		};
1562
1563		storage.put(with_key_1).await.expect("put should succeed");
1564		storage.put(without_key).await.expect("put should succeed");
1565		storage.put(with_key_2).await.expect("put should succeed");
1566
1567		// query should only return records with sort keys
1568		let results_query = storage.query_sorted(Query::new_full_range(0)).await
1569			.expect("query should succeed");
1570		assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1571		assert_eq!(results_query[0].data, b"with_key_1");
1572		assert_eq!(results_query[1].data, b"with_key_2");
1573
1574		// get_all should return all records
1575		let results_all = storage.get_all(0).await
1576			.expect("get_all should succeed");
1577		assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1578
1579		// Verify all three records are present (order not guaranteed for get_all)
1580		let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1581		let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1582		let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1583		assert!(has_with_key_1, "get_all should include with_key_1");
1584		assert!(has_with_key_2, "get_all should include with_key_2");
1585		assert!(has_without_key, "get_all should include record without sort key");
1586	}
1587
1588	/// Tests that query only returns records from the specified partition.
1589	pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1590		clear_partitions(storage, &[0, 1]).await.unwrap();
1591		// Records in partition A
1592		for i in 0..3 {
1593			let record = Record {
1594				pk: format!("query_iso_a_{}", i).into(),
1595				partition: 0,
1596				sort_key: Some(SortKey::u32_asc(i)),
1597				data: format!("record_{}", i).as_bytes().to_vec(),
1598			};
1599			storage.put(record).await.expect("put should succeed");
1600		}
1601
1602		// Records in partition B
1603		for i in 0..5 {
1604			let record = Record {
1605				pk: format!("query_iso_b_{}", i).into(),
1606				partition: 1,
1607				sort_key: Some(SortKey::u32_asc(i)),
1608				data: format!("record_{}", i + 100).as_bytes().to_vec(),
1609			};
1610			storage.put(record).await.expect("put should succeed");
1611		}
1612
1613		let results_a = storage
1614			.query_sorted(Query::new_full_range(0))
1615			.await
1616			.expect("query should succeed");
1617
1618		let results_b = storage
1619			.query_sorted(Query::new_full_range(1))
1620			.await
1621			.expect("query should succeed");
1622
1623		assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1624		assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1625
1626		// Verify all results_a are from partition A
1627		assert!(results_a
1628			.iter()
1629			.all(|r| r.partition == 0));
1630
1631		// Verify all results_b are from partition B
1632		assert!(results_b
1633			.iter()
1634			.all(|r| r.partition == 1));
1635	}
1636
1637	/// Tests that query with start and end keys returns only records within the range.
1638	pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1639		clear_partitions(storage, &[0]).await.unwrap();
1640
1641		// Insert records with sort keys 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1642		for i in 1..=10u32 {
1643			let record = Record {
1644				pk: format!("query_range_{}", i).into(),
1645				partition: 0,
1646				sort_key: Some(SortKey::u32_asc(i)),
1647				data: format!("record_{}", i).as_bytes().to_vec(),
1648			};
1649			storage.put(record).await.expect("put should succeed");
1650		}
1651
1652		// Query with start key only (>= 5)
1653		let results_start = storage
1654			.query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1655			.await
1656			.expect("query should succeed");
1657
1658		assert_eq!(results_start.len(), 6, "should return records 5-10");
1659		let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1660		assert_eq!(
1661			values,
1662			vec![
1663				b"record_5".to_vec(),
1664				b"record_6".to_vec(),
1665				b"record_7".to_vec(),
1666				b"record_8".to_vec(),
1667				b"record_9".to_vec(),
1668				b"record_10".to_vec(),
1669			],
1670			"should return records from 5 onwards"
1671		);
1672
1673		// Query with end key only (<= 3)
1674		let results_end = storage
1675			.query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1676			.await
1677			.expect("query should succeed");
1678
1679		assert_eq!(results_end.len(), 3, "should return records 1-3");
1680		let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1681		assert_eq!(
1682			values,
1683			vec![
1684				b"record_1".to_vec(),
1685				b"record_2".to_vec(),
1686				b"record_3".to_vec(),
1687			],
1688			"should return records up to 3"
1689		);
1690
1691		// Query with both start and end keys (3 <= x <= 7)
1692		let results_range = storage
1693			.query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1694			.await
1695			.expect("query should succeed");
1696
1697		assert_eq!(results_range.len(), 5, "should return records 3-7");
1698		let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1699		assert_eq!(
1700			values,
1701			vec![
1702				b"record_3".to_vec(),
1703				b"record_4".to_vec(),
1704				b"record_5".to_vec(),
1705				b"record_6".to_vec(),
1706				b"record_7".to_vec(),
1707			],
1708			"should return records in range 3-7"
1709		);
1710
1711		// Query range with limit
1712		let results_range_limit = storage
1713			.query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1714			.await
1715			.expect("query should succeed");
1716
1717		assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1718		let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1719		assert_eq!(
1720			values,
1721			vec![
1722				b"record_2".to_vec(),
1723				b"record_3".to_vec(),
1724				b"record_4".to_vec(),
1725			],
1726			"should return first 3 records in range"
1727		);
1728
1729		// Query range that matches no records
1730		let results_empty = storage
1731			.query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1732			.await
1733			.expect("query should succeed");
1734
1735		assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1736	}
1737
1738	/// Tests that exclusive end ranges (start..end) exclude the end bound.
1739	pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1740		clear_partitions(storage, &[0]).await.unwrap();
1741
1742		for i in 1..=10u32 {
1743			let record = Record {
1744				pk: format!("query_excl_{}", i).into(),
1745				partition: 0,
1746				sort_key: Some(SortKey::u32_asc(i)),
1747				data: format!("record_{}", i).as_bytes().to_vec(),
1748			};
1749			storage.put(record).await.expect("put should succeed");
1750		}
1751
1752		// Exclusive end: 3 <= x < 7 (should return records 3, 4, 5, 6)
1753		let results = storage
1754			.query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1755			.await
1756			.expect("query should succeed");
1757
1758		assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1759		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1760		assert_eq!(
1761			values,
1762			vec![
1763				b"record_3".to_vec(),
1764				b"record_4".to_vec(),
1765				b"record_5".to_vec(),
1766				b"record_6".to_vec(),
1767			],
1768		);
1769
1770		// Exclusive end only: x < 4 (should return records 1, 2, 3)
1771		let results = storage
1772			.query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1773			.await
1774			.expect("query should succeed");
1775
1776		assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1777		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1778		assert_eq!(
1779			values,
1780			vec![
1781				b"record_1".to_vec(),
1782				b"record_2".to_vec(),
1783				b"record_3".to_vec(),
1784			],
1785		);
1786	}
1787
1788	/// Tests that full-range query with limit 1 returns the first sorted record.
1789	pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1790		clear_partitions(storage, &[0]).await.unwrap();
1791
1792		for i in [5u32, 2, 8, 1, 9] {
1793			let record = Record {
1794				pk: format!("query_limit1_{}", i).into(),
1795				partition: 0,
1796				sort_key: Some(SortKey::u32_asc(i)),
1797				data: format!("record_{}", i).as_bytes().to_vec(),
1798			};
1799			storage.put(record).await.expect("put should succeed");
1800		}
1801
1802		let results = storage
1803			.query_sorted(Query::new_full_range(0).limit(1))
1804			.await
1805			.expect("query should succeed");
1806
1807		assert_eq!(results.len(), 1);
1808		assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1809	}
1810
1811	/// Tests that incremental_id returns 1 for the first call on a partition.
1812	pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1813		// Clear the LAST_IDS entry for partition 0
1814		storage.delete(LAST_IDS, b"0").await.unwrap();
1815		let id = storage.incremental_id(0).await
1816			.expect("incremental_id should succeed");
1817
1818		assert_eq!(id, 1, "first id should be 1");
1819	}
1820
1821	/// Tests that incremental_id increments on subsequent calls.
1822	pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1823		clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1824
1825		let id1 = storage.incremental_id(0).await
1826			.expect("incremental_id should succeed");
1827		let id2 = storage.incremental_id(0).await
1828			.expect("incremental_id should succeed");
1829		let id3 = storage.incremental_id(0).await
1830			.expect("incremental_id should succeed");
1831
1832		assert_eq!(id1, 1, "first id should be 1");
1833		assert_eq!(id2, 2, "second id should be 2");
1834		assert_eq!(id3, 3, "third id should be 3");
1835	}
1836
1837	/// Tests that incremental_id maintains separate sequences for different partitions.
1838	pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1839		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1840
1841		// Generate IDs for partition A
1842		let a1 = storage.incremental_id(0).await
1843			.expect("incremental_id should succeed");
1844		let a2 = storage.incremental_id(0).await
1845			.expect("incremental_id should succeed");
1846		let a3 = storage.incremental_id(0).await
1847			.expect("incremental_id should succeed");
1848
1849		// Generate IDs for partition B
1850		let b1 = storage.incremental_id(1).await
1851			.expect("incremental_id should succeed");
1852		let b2 = storage.incremental_id(1).await
1853			.expect("incremental_id should succeed");
1854
1855		// Partition A should have 1, 2, 3
1856		assert_eq!(a1, 1);
1857		assert_eq!(a2, 2);
1858		assert_eq!(a3, 3);
1859
1860		// Partition B should have its own sequence starting at 1
1861		assert_eq!(b1, 1);
1862		assert_eq!(b2, 2);
1863
1864		// Generate more for A - should continue from 3
1865		let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1866		assert_eq!(a4, 4);
1867	}
1868
1869	/// Tests that incremental_id persists its state correctly.
1870	pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1871		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1872
1873		// Generate some IDs
1874		let id1 = storage.incremental_id(0).await
1875			.expect("incremental_id should succeed");
1876		let id2 = storage.incremental_id(0).await
1877			.expect("incremental_id should succeed");
1878		assert_eq!(id1, 1);
1879		assert_eq!(id2, 2);
1880
1881		// Verify the stored value can be retrieved directly
1882		let stored = storage
1883			.get(LAST_IDS, &[0])
1884			.await
1885			.expect("get should succeed")
1886			.expect("id record should exist");
1887		let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1888		assert_eq!(stored_id, 2, "stored id should be 2");
1889
1890		// Continue generating - should pick up where we left off
1891		let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1892		assert_eq!(id3, 3);
1893	}
1894
1895	/// Tests that get_all returns empty results for empty partition.
1896	pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1897		clear_partitions(storage, &[0]).await.unwrap();
1898		let results = storage
1899			.get_all(0)
1900			.await
1901			.expect("get_all should succeed");
1902
1903		assert!(results.is_empty(), "get_all should return empty for empty partition");
1904	}
1905
1906	/// Tests that get_all returns all records in a partition.
1907	pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1908		clear_partitions(storage, &[0]).await.unwrap();
1909
1910		// Insert records with sort keys
1911		for i in 0..5 {
1912			let record = Record {
1913				pk: format!("get_all_{}", i).into(),
1914				partition: 0,
1915				sort_key: Some(SortKey::u32_asc(i)),
1916				data: format!("record_{}", i).as_bytes().to_vec(),
1917			};
1918			storage.put(record).await.expect("put should succeed");
1919		}
1920
1921		let results = storage
1922			.get_all(0)
1923			.await
1924			.expect("get_all should succeed");
1925
1926		assert_eq!(results.len(), 5, "get_all should return all 5 records");
1927
1928		// Verify all records are present (order not guaranteed)
1929		for i in 0..5 {
1930			let expected_data = format!("record_{}", i).as_bytes().to_vec();
1931			let found = results.iter().any(|r| r.data == expected_data);
1932			assert!(found, "get_all should include record_{}", i);
1933		}
1934	}
1935
1936	/// Tests that get_all includes records without sort keys.
1937	pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1938		clear_partitions(storage, &[0]).await.unwrap();
1939
1940		// Insert records with sort keys
1941		let with_key_1 = Record {
1942			pk: "get_all_with_1".into(),
1943			partition: 0,
1944			sort_key: Some(SortKey::u32_asc(1)),
1945			data: b"with_key_1".to_vec(),
1946		};
1947		let with_key_2 = Record {
1948			pk: "get_all_with_2".into(),
1949			partition: 0,
1950			sort_key: Some(SortKey::u32_asc(2)),
1951			data: b"with_key_2".to_vec(),
1952		};
1953
1954		// Insert records without sort keys
1955		let without_key_1 = Record {
1956			pk: "get_all_without_1".into(),
1957			partition: 0,
1958			sort_key: None,
1959			data: b"without_key_1".to_vec(),
1960		};
1961		let without_key_2 = Record {
1962			pk: "get_all_without_2".into(),
1963			partition: 0,
1964			sort_key: None,
1965			data: b"without_key_2".to_vec(),
1966		};
1967
1968		storage.put(with_key_1).await.expect("put should succeed");
1969		storage.put(without_key_1).await.expect("put should succeed");
1970		storage.put(with_key_2).await.expect("put should succeed");
1971		storage.put(without_key_2).await.expect("put should succeed");
1972
1973		let results = storage
1974			.get_all(0)
1975			.await
1976			.expect("get_all should succeed");
1977
1978		assert_eq!(results.len(), 4, "get_all should return all 4 records");
1979
1980		// Verify all records are present
1981		let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
1982		let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
1983		let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
1984		let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
1985
1986		assert!(has_with_1, "get_all should include with_key_1");
1987		assert!(has_with_2, "get_all should include with_key_2");
1988		assert!(has_without_1, "get_all should include without_key_1");
1989		assert!(has_without_2, "get_all should include without_key_2");
1990
1991		// Verify that query excludes records without sort keys
1992		let query_results = storage
1993			.query_sorted(Query::new_full_range(0))
1994			.await
1995			.expect("query should succeed");
1996
1997		assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
1998		let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
1999		assert!(!query_has_without, "query should not include records without sort keys");
2000	}
2001
2002	/// Tests that get_all only returns records from the specified partition.
2003	pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
2004		clear_partitions(storage, &[0, 1]).await.unwrap();
2005
2006		// Insert records in partition 0
2007		for i in 0..3 {
2008			let record = Record {
2009				pk: format!("partition_0_{}", i).into(),
2010				partition: 0,
2011				sort_key: Some(SortKey::u32_asc(i)),
2012				data: format!("p0_record_{}", i).as_bytes().to_vec(),
2013			};
2014			storage.put(record).await.expect("put should succeed");
2015		}
2016
2017		// Insert records in partition 1
2018		for i in 0..2 {
2019			let record = Record {
2020				pk: format!("partition_1_{}", i).into(),
2021				partition: 1,
2022				sort_key: Some(SortKey::u32_asc(i)),
2023				data: format!("p1_record_{}", i).as_bytes().to_vec(),
2024			};
2025			storage.put(record).await.expect("put should succeed");
2026		}
2027
2028		// get_all partition 0
2029		let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2030		assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2031		assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2032
2033		// get_all partition 1
2034		let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2035		assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2036		assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2037	}
2038
2039	/// Tests that get_all reflects deletions.
2040	pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2041		clear_partitions(storage, &[0]).await.unwrap();
2042
2043		for i in 0..3u32 {
2044			let record = Record {
2045				pk: format!("get_all_del_{}", i).into(),
2046				partition: 0,
2047				sort_key: Some(SortKey::u32_asc(i)),
2048				data: format!("record_{}", i).as_bytes().to_vec(),
2049			};
2050			storage.put(record).await.expect("put should succeed");
2051		}
2052
2053		storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2054
2055		let results = storage.get_all(0).await.expect("get_all should succeed");
2056		assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2057
2058		let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2059		assert!(!has_deleted, "deleted record should not appear");
2060	}
2061}