Skip to main content

bark/persist/adaptor/
mod.rs

1//! Storage adaptor module providing the [StorageAdaptor] trait and blanket
2//! implementation of [BarkPersister] for any type implementing [StorageAdaptor].
3//!
4//! This module provides an optimized single-table storage abstraction that can be
5//! efficiently implemented on various backends (SQLite, Postgres, MongoDB, Firebase,
6//! in-memory, etc.).
7//!
8//! The design uses structured keys:
9//! - **Primary key (`pk`)**: Unique identifier for each record
10//! - **Partition key**: Groups related records for efficient querying
11//! - **Sort key**: Enables ordered iteration and range queries
12//!
13//! # Example
14//!
15//! ```rust
16//! # use bark::persist::adaptor::memory::MemoryStorageAdaptor;
17//! # use bark::persist::adaptor::{Query, Record, StorageAdaptor, SortKey};
18//!
19//! # async fn example() -> anyhow::Result<()> {
20//! // Create an in-memory storage adaptor
21//! let mut storage = MemoryStorageAdaptor::new();
22//!
23//! // Store a record sorted by a numeric field (ascending)
24//! let record = Record {
25//!     partition: 0,
26//!     pk: "item:1".into(),
27//!     sort_key: Some(SortKey::u32_asc(42)),
28//!     data: b"hello world".to_vec(),
29//! };
30//! storage.put(record).await?;
31//!
32//! // Query with efficient index scan
33//! let query = Query::new_full_range(0).limit(10);
34//! let records = storage.query_sorted(query).await?;
35//! # Ok(())
36//! # }
37//! ```
38
39mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::{SortKey, SortKeyBuilder};
45
46#[cfg(feature = "indexed-db")]
47pub mod indexed_db;
48
49use std::ops::RangeBounds;
50
51use anyhow::Context;
52use bitcoin::{Amount, Transaction, Txid};
53use bitcoin::secp256k1::PublicKey;
54use bitcoin::hashes::Hash;
55#[cfg(feature = "onchain-bdk")]
56use bdk_core::Merge;
57#[cfg(feature = "onchain-bdk")]
58use bdk_wallet::ChangeSet;
59use chrono::{DateTime, Local};
60use lightning_invoice::Bolt11Invoice;
61use serde::{de::DeserializeOwned, Serialize};
62
63use ark::lightning::{Invoice, PaymentHash, Preimage};
64use ark::{Vtxo, VtxoId};
65use ark::vtxo::Full;
66use bitcoin_ext::BlockDelta;
67
68use crate::actions::{WalletActionCheckpoint, WalletActionId};
69use crate::exit::ExitTxOrigin;
70use crate::movement::{
71	Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
72};
73use crate::persist::BarkPersister;
74use crate::persist::models::{
75	LightningReceive, LightningSend, PendingBoard, PendingOffboard,
76	RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
77	StoredRoundState, Unlocked, wallet_vtxo_from_full,
78};
79use crate::round::RoundState;
80use crate::vtxo::{VtxoState, VtxoStateKind};
81use crate::{WalletProperties, WalletVtxo};
82
83
84pub mod partition {
85	pub const PROPERTIES: u8 = 0;
86	#[allow(unused)]
87	pub const BDK_CHANGESET: u8 = 1;
88	pub const VTXO: u8 = 2;
89	pub const PUBLIC_KEY: u8 = 3;
90	pub const PENDING_BOARD: u8 = 4;
91	pub const ROUND_STATE: u8 = 5;
92	pub const MOVEMENT: u8 = 6;
93	pub const LIGHTNING_SEND: u8 = 7;
94	pub const LIGHTNING_RECEIVE: u8 = 8;
95	pub const EXIT_VTXO: u8 = 9;
96	pub const EXIT_CHILD_TX: u8 = 10;
97	pub const MAILBOX_CHECKPOINT: u8 = 11;
98	pub const PENDING_OFFBOARD: u8 = 12;
99	/// An index table for payment method to movement
100	pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
101	/// Per-action checkpoint payloads keyed by [WalletActionId].
102	pub const WALLET_ACTION_CHECKPOINT: u8 = 14;
103
104	pub const LAST_IDS: u8 = u8::MAX;
105}
106
107/// A storage record with structured keys.
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109pub struct Record {
110	/// Partition key for grouping related records (e.g., "vtxo", "movement").
111	///
112	/// Queries always filter by partition.
113	pub partition: u8,
114
115	/// Unique primary key
116	pub pk: Vec<u8>,
117
118	/// Optional sort key for ordered iteration within a partition.
119	///
120	/// Use [`SortKey::builder()`] to construct composite keys with
121	/// mixed sort directions.
122	///
123	/// This field may be set or changed after record insertion.
124	/// Implementation should support updating the sort key of a
125	/// record post-insert if needed.
126	pub sort_key: Option<SortKey>,
127
128	/// The record data encoded as JSON.
129	pub data: Vec<u8>,
130}
131
132impl Record {
133	/// Converts the record data to a typed value.
134	fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
135		serde_json::from_slice(&self.data).map_err(Into::into)
136	}
137
138	/// Creates a new record from a typed value.
139	fn from_data<T: Serialize>(
140		partition: u8,
141		pk: &[u8],
142		sort_key: Option<SortKey>,
143		data: &T,
144	) -> anyhow::Result<Record> {
145		Ok(Record {
146			partition,
147			pk: pk.to_vec(),
148			sort_key,
149			data: serde_json::to_vec(data)?,
150		})
151	}
152}
153
154/// A range of sort keys.
155pub trait QueryRange: RangeBounds<SortKey> + Send {}
156
157impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
158
159/// Query specification for retrieving sorted records from a partition.
160#[derive(Debug, Clone)]
161pub struct Query<R: QueryRange> {
162	/// Partition to query (required).
163	pub partition: u8,
164
165	/// Range of sort keys to query.
166	pub range: R,
167
168	/// Maximum number of records to return.
169	pub limit: Option<usize>,
170}
171
172impl<R: QueryRange> Query<R> {
173	/// Creates a new query for the given partition.
174	pub fn new(partition: u8, range: R) -> Self {
175		Self {
176			partition,
177			range,
178			limit: None,
179		}
180	}
181
182	/// Sets the maximum number of records to return.
183	///
184	/// If the range is greater than the limit, the query will
185	/// return the first `limit` records.
186	pub fn limit(mut self, limit: usize) -> Self {
187		self.limit = Some(limit);
188		self
189	}
190}
191
192impl Query<std::ops::RangeFull> {
193	pub fn new_full_range(partition: u8) -> Self {
194		Self::new(partition, ..)
195	}
196}
197
198fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
199	let body = pm.value_string();
200
201	let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
202	buf.extend(pm.type_str().as_bytes().iter().copied());
203	// nb we need to separate with a non-utf8 byte
204	// we use 0xfe so that we can easily query on prefix from <type>0xfe .. <type>0xff
205	buf.push(0xfe);
206	buf.extend(body.into_bytes());
207	buf
208}
209
210/// Storage adaptor trait for persistence backends.
211///
212/// This trait provides a minimal interface (5 methods) that can be efficiently
213/// implemented on various storage backends while enabling query optimization.
214///
215/// # Implementor's Guide
216///
217/// ## Simple backends (memory, file-based)
218///
219/// Store records in a map/list keyed by `(partition, pk)`.
220///
221/// - `query_sorted`: query sorted records by partition, excluding those without
222///	  a sort key defined.
223/// - `get_all`: return every record in the partition regardless of whether it
224///   has a sort key. No ordering is guaranteed.
225///
226/// ## Database backends (Postgres, MongoDB, Firebase, IndexedDB, etc.)
227///
228/// Create a single table with indexes:
229///
230/// ```sql
231/// CREATE TABLE storage (
232///     pk TEXT PRIMARY KEY,
233///     partition TEXT NOT NULL,
234///     sort_key BLOB,
235///     data BLOB NOT NULL
236/// );
237/// CREATE INDEX idx_partition_sort ON storage(partition, sort_key);
238/// ```
239///
240/// Translate [`Query`] to SQL:
241///
242/// ```sql
243/// -- query_sorted: only returns records with a sort key
244/// SELECT * FROM storage
245/// WHERE partition = :partition AND sort_key IS NOT NULL
246/// ORDER BY sort_key DESC
247/// ```
248///
249/// Translate [`get_all`](StorageAdaptor::get_all) to SQL:
250///
251/// ```sql
252/// -- get_all: returns all records in the partition, no ordering
253/// SELECT * FROM storage
254/// WHERE partition = :partition
255/// ```
256#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
257#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
258pub trait StorageAdaptor: Send + Sync + 'static {
259	/// Stores a record, inserting or updating by primary key.
260	async fn put(&mut self, record: Record) -> anyhow::Result<()>;
261
262	/// Retrieves a record by primary key.
263	///
264	/// Returns `None` if the record doesn't exist.
265	async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
266
267	/// Deletes a record by primary key.
268	///
269	/// Returns the deleted record if it existed, `None` otherwise.
270	async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
271
272	/// Queries sorted records from a partition.
273	///
274	/// Results are ordered by sort key.
275	/// Records without a sort key must not be returned, to query all
276	/// records use [`StorageAdaptor::get_all`].
277	async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
278		-> anyhow::Result<Vec<Record>>;
279
280	/// Get all records in a partition.
281	///
282	/// No ordering guarantees are made.
283	async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
284
285	/// Increments the last partition id, then stores and returns the new id.
286	async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
287		let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
288			.map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
289		let next_partition_id = last_partition_id + 1;
290
291		let record = Record::from_data(
292			partition::LAST_IDS,
293			&[partition],
294			None,
295			&next_partition_id,
296		)?;
297
298		self.put(record).await?;
299		Ok(next_partition_id)
300	}
301}
302
303async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
304	match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
305		Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
306		None => Ok(None),
307	}
308}
309
310async fn get_check_vtxo_state<S: StorageAdaptor>(
311	adaptor: &S,
312	vtxo_id: VtxoId,
313	allowed_states: &[VtxoStateKind],
314) -> anyhow::Result<SerdeVtxo> {
315	let vtxo = get_vtxo(adaptor, vtxo_id).await?
316		.context("vtxo not found")?;
317
318	let current_state = vtxo.current_state().context("vtxo has no state")?;
319	if !allowed_states.contains(&current_state.kind()) {
320		bail!("current state {:?} not in allowed states {:?}",
321			current_state.kind(), allowed_states
322		);
323	}
324
325	Ok(vtxo)
326}
327
328async fn update_vtxo_state_checked<S: StorageAdaptor>(
329	adaptor: &mut S,
330	vtxo_id: VtxoId,
331	new_state: VtxoState,
332	allowed_old_states: &[VtxoStateKind],
333) -> anyhow::Result<WalletVtxo> {
334	let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
335
336	let sk = sort::vtxo_sort_key(
337		new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
338	);
339
340	serde_vtxo.states.push(new_state.clone());
341	let updated_record = Record::from_data(
342		partition::VTXO,
343		&vtxo_id.to_bytes(),
344		Some(sk),
345		&serde_vtxo,
346	)?;
347
348	adaptor.put(updated_record).await?;
349
350	Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, new_state))
351}
352
353pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
354	inner: tokio::sync::RwLock<S>,
355}
356
357impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
358	pub fn new(inner: S) -> Self {
359		Self {
360			inner: tokio::sync::RwLock::new(inner),
361		}
362	}
363}
364
365/// Blanket implementation of `BarkPersister` for any type implementing `StorageAdaptor`.
366#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
367#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
368impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
369	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
370		let record = Record::from_data(
371			partition::PROPERTIES,
372			// NB: a single set of properties is stored, so no need for primary key
373			&[],
374			None,
375			properties,
376		)?;
377		self.inner.write().await.put(record).await
378	}
379
380	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
381		match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
382			Some(record) => Ok(Some(record.to_data()?)),
383			None => Ok(None),
384		}
385	}
386
387	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
388		let mut properties = match self.read_properties().await? {
389			Some(properties) => properties,
390			None => bail!("wallet not initialized"),
391		};
392
393		properties.server_pubkey = Some(server_pubkey);
394
395		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
396		self.inner.write().await.put(record).await
397	}
398
399	async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
400		let mut properties = match self.read_properties().await? {
401			Some(properties) => properties,
402			None => bail!("wallet not initialized"),
403		};
404
405		properties.server_mailbox_pubkey = Some(server_mailbox_pubkey);
406
407		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
408		self.inner.write().await.put(record).await
409	}
410
411	#[cfg(feature = "onchain-bdk")]
412	async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
413		match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
414			Some(record) => record.to_data(),
415			None => Ok(ChangeSet::default()),
416		}
417	}
418
419	#[cfg(feature = "onchain-bdk")]
420	async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
421		let mut current = self.initialize_bdk_wallet().await?;
422		current.merge(changeset.clone());
423
424		let record = Record::from_data(
425			partition::BDK_CHANGESET,
426			// NB: a single changeset is stored, so no need for primary key
427			&[],
428			None,
429			&current,
430		)?;
431		self.inner.write().await.put(record).await
432	}
433
434	async fn create_new_movement(
435		&self,
436		status: MovementStatus,
437		subsystem: &MovementSubsystem,
438		time: DateTime<Local>,
439	) -> anyhow::Result<MovementId> {
440		let mut lock = self.inner.write().await;
441
442		let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
443		let movement = Movement::new(id, status, subsystem, time);
444
445		let record = Record::from_data(
446			partition::MOVEMENT,
447			&id.to_bytes(),
448			Some(sort::movement_sort_key(&time)),
449			&movement,
450		)?;
451		lock.put(record).await?;
452
453		Ok(id)
454	}
455
456	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
457		let mut guard = self.inner.write().await;
458
459		let record = Record::from_data(
460			partition::MOVEMENT,
461			&movement.id.to_bytes(),
462			Some(sort::movement_sort_key(&movement.time.created_at)),
463			movement,
464		)?;
465		guard.put(record).await?;
466
467		// then add records for each payment method
468		let sent = movement.sent_to.iter().map(|d| &d.destination);
469		let rcvd = movement.received_on.iter().map(|d| &d.destination);
470		for pm in sent.chain(rcvd) {
471			let pm_bytes = serialize_payment_method(pm);
472			let primary_key = {
473				// We just need a unique key, but we will never query using this
474				let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
475				buf.extend(pm_bytes.iter().copied());
476				buf.extend(movement.id.to_bytes());
477				buf
478			};
479			let record = Record::from_data(
480				partition::MOVEMENT_PAYMENT_METHOD,
481				&primary_key,
482				Some(SortKey::from_bytes(pm_bytes)),
483				&movement.id.0,
484			)?;
485			guard.put(record).await?;
486		}
487
488		Ok(())
489	}
490
491	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
492		self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
493			.await?
494			.context("movement not found")?
495			.to_data()
496	}
497
498	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
499		let records = self.inner.read().await
500			.query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
501		records.into_iter().map(|r| r.to_data()).collect()
502	}
503
504	async fn get_movements_by_payment_method(
505		&self,
506		payment_method: &PaymentMethod,
507	) -> anyhow::Result<Vec<Movement>> {
508		let pm_bytes = serialize_payment_method(payment_method);
509		let sort_key = SortKey::from_bytes(pm_bytes);
510
511		let guard = self.inner.read().await;
512		let idx_recs = guard.query_sorted(Query::new(
513			partition::MOVEMENT_PAYMENT_METHOD,
514			sort_key.clone()..=sort_key,
515		)).await?;
516
517		let mut ret = Vec::with_capacity(idx_recs.len());
518		for idx_rec in idx_recs {
519			let id = MovementId::new(idx_rec.to_data::<u32>()
520				.context("corrupt db: movement payment method index value")?);
521
522			ret.push(
523				guard.get(partition::MOVEMENT, &id.to_bytes()).await?
524					.context("corrupt db: movement payment method entry for unknown movement")?
525					.to_data()
526					.context("corrupt db: invalid movement record")?
527			);
528		}
529		Ok(ret)
530	}
531
532	async fn store_pending_board(
533		&self,
534		vtxo: &Vtxo<Full>,
535		funding_tx: &Transaction,
536		movement_id: MovementId,
537	) -> anyhow::Result<()> {
538		let pending_board = PendingBoard {
539			vtxos: vec![vtxo.id()],
540			amount: vtxo.amount(),
541			funding_tx: funding_tx.clone(),
542			movement_id,
543		};
544
545		let record = Record::from_data(
546			partition::PENDING_BOARD,
547			&vtxo.id().to_bytes(),
548			None,
549			&pending_board,
550		)?;
551
552		self.inner.write().await.put(record).await
553	}
554
555	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
556		self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
557		Ok(())
558	}
559
560	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
561		let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
562		records
563			.into_iter()
564			.map(|r| {
565				let board = r.to_data::<PendingBoard>()?;
566				Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
567			})
568			.collect()
569	}
570
571	async fn get_pending_board_by_vtxo_id(
572		&self,
573		vtxo_id: VtxoId,
574	) -> anyhow::Result<Option<PendingBoard>> {
575		match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
576			Some(record) => Ok(Some(record.to_data()?)),
577			None => Ok(None),
578		}
579	}
580
581	async fn store_round_state_lock_vtxos(
582		&self,
583		round_state: &RoundState,
584	) -> anyhow::Result<RoundStateId> {
585		let mut lock = self.inner.write().await;
586
587		let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
588
589		let allowed_states = &[VtxoStateKind::Spendable];
590
591		// First check that the inputs are spendable
592		for vtxo in round_state.participation().inputs.iter() {
593			get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
594		}
595
596		for vtxo in round_state.participation().inputs.iter() {
597			update_vtxo_state_checked(
598				&mut *lock,
599				vtxo.id(),
600				VtxoState::Locked {
601					holder: round_state.movement_id
602						.map(|id| crate::vtxo::VtxoLockHolder::Movement { id }),
603				},
604				allowed_states,
605			).await?;
606		}
607
608		let serde_state = SerdeRoundState::from(round_state);
609		let record = Record::from_data(
610			partition::ROUND_STATE,
611			&id.to_bytes(),
612			Some(sort::SortKey::u32_asc(id.0)),
613			&serde_state,
614		)?;
615		lock.put(record).await?;
616
617		Ok(id)
618	}
619
620	async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
621		let serde_state = SerdeRoundState::from(round_state.state());
622		let record = Record::from_data(
623			partition::ROUND_STATE,
624			&round_state.id().to_bytes(),
625			Some(sort::SortKey::u32_asc(round_state.id().0)),
626			&serde_state,
627		)?;
628		self.inner.write().await.put(record).await
629	}
630
631	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
632		self.inner.write().await
633			.delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
634		Ok(())
635	}
636
637	async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
638		let record = self.inner.read().await
639			.get(partition::ROUND_STATE, &_id.to_bytes()).await?;
640		match record {
641			Some(r) => {
642				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
643				let id = RoundStateId(u32::from_be_bytes(pk_slice));
644				let state = r.to_data::<SerdeRoundState>()?.into();
645				Ok(Some(StoredRoundState::new(id, state)))
646			},
647			None => Ok(None),
648		}
649	}
650
651	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
652		let records = self.inner.read().await
653			.get_all(partition::ROUND_STATE).await?;
654		records.into_iter()
655			.map(|r| {
656				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
657				Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
658			})
659			.collect()
660	}
661
662	async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
663		let mut lock = self.inner.write().await;
664
665		for (vtxo, state) in vtxos {
666			let serde_vtxo = SerdeVtxo {
667				vtxo: (*vtxo).clone(),
668				states: vec![(*state).clone()],
669			};
670
671			let sk = sort::vtxo_sort_key(
672				state.kind(), vtxo.expiry_height(), vtxo.amount(),
673			);
674			let record = Record::from_data(
675				partition::VTXO,
676				&vtxo.id().to_bytes(),
677				Some(sk),
678				&serde_vtxo,
679			)?;
680			lock.put(record).await?;
681		}
682		Ok(())
683	}
684
685	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
686		let lock = self.inner.read().await;
687		match get_vtxo(&*lock, id).await? {
688			Some(serde_vtxo) => {
689				let state = serde_vtxo.current_state()
690					.context("vtxo has no state")?.clone();
691				Ok(Some(wallet_vtxo_from_full(&serde_vtxo.vtxo, state)))
692			},
693			None => Ok(None),
694		}
695	}
696
697	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
698		let records = self.inner.read().await
699			.query_sorted(Query::new_full_range(partition::VTXO)).await?;
700
701		records
702			.into_iter()
703			.map(|r| {
704				let serde_vtxo = r.to_data::<SerdeVtxo>()?;
705				let state = serde_vtxo
706					.current_state()
707					.cloned()
708					.context("vtxo has no state")?;
709				Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, state))
710			})
711			.collect()
712	}
713
714	async fn get_vtxos_by_state(
715		&self,
716		states: &[VtxoStateKind],
717	) -> anyhow::Result<Vec<WalletVtxo>> {
718		let lock = self.inner.read().await;
719
720		let range = |state: VtxoStateKind| {
721			let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
722			let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
723			(start, end)
724		};
725
726		let mut records = Vec::new();
727		for state in states {
728			let (start, end) = range(*state);
729			let query = Query::new(partition::VTXO, start..=end);
730
731			for record in lock.query_sorted(query).await? {
732				let serde_vtxo = record.to_data::<SerdeVtxo>()?;
733				let current_state = serde_vtxo.current_state()
734					.context("vtxo has no current state")?.clone();
735				debug_assert_eq!(current_state.kind(), *state);
736				records.push(wallet_vtxo_from_full(&serde_vtxo.vtxo, current_state));
737			}
738		}
739
740		Ok(records)
741	}
742
743	async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
744		let lock = self.inner.read().await;
745		Ok(get_vtxo(&*lock, id).await?.map(|s| s.vtxo))
746	}
747
748	async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
749		let lock = self.inner.read().await;
750		let mut out = Vec::with_capacity(ids.len());
751		for id in ids {
752			let serde_vtxo = get_vtxo(&*lock, *id).await?
753				.with_context(|| format!("vtxo {id} not found"))?;
754			out.push(serde_vtxo.vtxo);
755		}
756		Ok(out)
757	}
758
759	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
760		match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
761			Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
762			None => Ok(None),
763		}
764	}
765
766	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
767		match self.get_wallet_vtxo(id).await? {
768			Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
769			None => Ok(false),
770		}
771	}
772
773	async fn update_vtxo_state_checked(
774		&self,
775		vtxo_id: VtxoId,
776		new_state: VtxoState,
777		allowed_old_states: &[VtxoStateKind],
778	) -> anyhow::Result<WalletVtxo> {
779		let mut lock = self.inner.write().await;
780		update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
781	}
782
783	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
784		let vtxo_key = SerdeVtxoKey { index, public_key };
785		let record = Record::from_data(
786			partition::PUBLIC_KEY,
787			&public_key.serialize()[..],
788			Some(sort::SortKey::u64_desc(index as u64)),
789			&vtxo_key,
790		)?;
791		self.inner.write().await.put(record).await
792	}
793
794	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
795		// pks are sorted descending, so the first one is the highest index
796		let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
797		let records = self.inner.read().await.query_sorted(query).await?;
798
799		match records.into_iter().next() {
800			Some(record) => {
801				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
802				Ok(Some(vtxo_key.index))
803			}
804			None => Ok(None),
805		}
806	}
807
808	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
809		match self.inner.read().await
810			.get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
811		{
812			Some(record) => {
813				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
814				Ok(Some(vtxo_key.index))
815			}
816			None => Ok(None),
817		}
818	}
819
820	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
821		match self.inner.read().await
822			.get(partition::MAILBOX_CHECKPOINT, &[]).await?
823		{
824			Some(record) => Ok(record.to_data::<u64>()?),
825			None => Ok(0),
826		}
827	}
828
829	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
830		let mut lock = self.inner.write().await;
831		let record = Record::from_data(
832			partition::MAILBOX_CHECKPOINT,
833			&[],
834			None,
835			&checkpoint,
836		)?;
837		lock.put(record).await?;
838		Ok(())
839	}
840
841	async fn store_new_pending_lightning_send(
842		&self,
843		invoice: &Invoice,
844		amount: Amount,
845		fee: Amount,
846		vtxo_ids: &[VtxoId],
847		movement_id: MovementId,
848	) -> anyhow::Result<LightningSend> {
849		let mut lock = self.inner.write().await;
850		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
851		for vtxo_id in vtxo_ids {
852			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
853				.context("vtxo not found")?;
854			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
855		}
856
857		let lightning_send = LightningSend {
858			invoice: invoice.clone(),
859			amount,
860			fee,
861			htlc_vtxos,
862			preimage: None,
863			movement_id,
864			finished_at: None,
865		};
866
867		let record = Record::from_data(
868			partition::LIGHTNING_SEND,
869			&invoice.payment_hash().to_byte_array(),
870			None,
871			&lightning_send,
872		)?;
873
874		lock.put(record).await?;
875
876		Ok(lightning_send)
877	}
878
879	async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
880		let records = self.inner.read().await
881			.get_all(partition::LIGHTNING_SEND).await?;
882		records
883			.into_iter()
884			.filter_map(|r| {
885				let send = r.to_data::<LightningSend>().ok()?;
886				if send.finished_at.is_none() {
887					Some(Ok(send))
888				} else {
889					None
890				}
891			})
892			.collect()
893	}
894
895	async fn finish_lightning_send(
896		&self,
897		payment_hash: PaymentHash,
898		preimage: Option<Preimage>,
899	) -> anyhow::Result<()> {
900		let mut lock = self.inner.write().await;
901
902		let pk = payment_hash.to_byte_array();
903		let record = lock
904			.get(partition::LIGHTNING_SEND, &pk).await?.context("lightning send not found")?;
905		let mut lightning_send: LightningSend = record.to_data()?;
906
907		lightning_send.preimage = preimage;
908		lightning_send.finished_at = Some(Local::now());
909
910		let updated_record = Record::from_data(
911			partition::LIGHTNING_SEND,
912			&pk,
913			None,
914			&lightning_send,
915		)?;
916		lock.put(updated_record).await?;
917
918		Ok(())
919	}
920
921	async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
922		self.inner.write().await.delete(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?;
923		Ok(())
924	}
925
926	async fn get_lightning_send(
927		&self,
928		payment_hash: PaymentHash,
929	) -> anyhow::Result<Option<LightningSend>> {
930		match self.inner.read().await
931			.get(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?
932		{
933			Some(record) => Ok(Some(record.to_data()?)),
934			None => Ok(None),
935		}
936	}
937
938	async fn upsert_wallet_action_checkpoint(
939		&self,
940		id: &WalletActionId,
941		checkpoint: &WalletActionCheckpoint,
942	) -> anyhow::Result<()> {
943		let record = Record::from_data(
944			partition::WALLET_ACTION_CHECKPOINT,
945			id.as_bytes(),
946			None,
947			checkpoint,
948		)?;
949		self.inner.write().await.put(record).await
950	}
951
952	async fn get_wallet_action_checkpoint(
953		&self,
954		id: &WalletActionId,
955	) -> anyhow::Result<Option<WalletActionCheckpoint>> {
956		match self.inner.read().await
957			.get(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?
958		{
959			Some(record) => Ok(Some(record.to_data()?)),
960			None => Ok(None),
961		}
962	}
963
964	async fn get_all_wallet_action_checkpoints(
965		&self,
966	) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
967		let records = self.inner.read().await
968			.get_all(partition::WALLET_ACTION_CHECKPOINT).await?;
969		records.into_iter().map(|r| r.to_data()).collect()
970	}
971
972	async fn remove_wallet_action_checkpoint(
973		&self,
974		id: &WalletActionId,
975	) -> anyhow::Result<()> {
976		self.inner.write().await
977			.delete(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?;
978		Ok(())
979	}
980
981	async fn store_lightning_receive(
982		&self,
983		payment_hash: PaymentHash,
984		preimage: Preimage,
985		invoice: &Bolt11Invoice,
986		htlc_recv_cltv_delta: BlockDelta,
987	) -> anyhow::Result<()> {
988		let lightning_receive = LightningReceive {
989			payment_hash,
990			payment_preimage: preimage,
991			invoice: invoice.clone(),
992			htlc_recv_cltv_delta,
993			htlc_vtxos: vec![],
994			movement_id: None,
995			finished_at: None,
996			preimage_revealed_at: None,
997		};
998
999		let record = Record::from_data(
1000			partition::LIGHTNING_RECEIVE,
1001			&payment_hash.to_byte_array(),
1002			None,
1003			&lightning_receive,
1004		)?;
1005		self.inner.write().await.put(record).await
1006	}
1007
1008	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
1009		let records = self.inner.read().await
1010			.get_all(partition::LIGHTNING_RECEIVE).await?;
1011		records
1012			.into_iter()
1013			.filter_map(|r| {
1014				let receive = r.to_data::<LightningReceive>().ok()?;
1015				if receive.finished_at.is_none() {
1016					Some(Ok(receive))
1017				} else {
1018					None
1019				}
1020			})
1021			.collect()
1022	}
1023
1024	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
1025		let mut lock = self.inner.write().await;
1026
1027		let pk = payment_hash.to_byte_array();
1028		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1029			.context("lightning receive not found")?;
1030		let mut lightning_receive: LightningReceive = record.to_data()?;
1031
1032		lightning_receive.preimage_revealed_at = Some(Local::now());
1033
1034		let updated_record = Record::from_data(
1035			partition::LIGHTNING_RECEIVE,
1036			&pk,
1037			None,
1038			&lightning_receive,
1039		)?;
1040		lock.put(updated_record).await
1041	}
1042
1043	async fn update_lightning_receive(
1044		&self,
1045		payment_hash: PaymentHash,
1046		vtxo_ids: &[VtxoId],
1047		movement_id: MovementId,
1048	) -> anyhow::Result<()> {
1049		let mut lock = self.inner.write().await;
1050		let pk = payment_hash.to_byte_array();
1051		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1052			.context("lightning receive not found")?;
1053		let mut lightning_receive: LightningReceive = record.to_data()?;
1054
1055		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
1056		for vtxo_id in vtxo_ids {
1057			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
1058				.context("vtxo not found")?;
1059			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
1060		}
1061
1062		lightning_receive.htlc_vtxos = htlc_vtxos;
1063		lightning_receive.movement_id = Some(movement_id);
1064
1065		let updated_record = Record::from_data(
1066			partition::LIGHTNING_RECEIVE,
1067			&pk,
1068			None,
1069			&lightning_receive,
1070		)?;
1071		lock.put(updated_record).await
1072	}
1073
1074	async fn fetch_lightning_receive_by_payment_hash(
1075		&self,
1076		payment_hash: PaymentHash,
1077	) -> anyhow::Result<Option<LightningReceive>> {
1078		match self.inner.read().await
1079			.get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1080		{
1081			Some(record) => Ok(Some(record.to_data()?)),
1082			None => Ok(None),
1083		}
1084	}
1085
1086	async fn finish_pending_lightning_receive(
1087		&self,
1088		payment_hash: PaymentHash,
1089	) -> anyhow::Result<()> {
1090		let mut lock = self.inner.write().await;
1091		let pk = payment_hash.to_byte_array();
1092		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1093			.context("lightning receive not found")?;
1094		let mut lightning_receive: LightningReceive = record.to_data()?;
1095
1096		lightning_receive.finished_at = Some(Local::now());
1097
1098		let updated_record = Record::from_data(
1099			partition::LIGHTNING_RECEIVE,
1100			&pk,
1101			None,
1102			&lightning_receive,
1103		)?;
1104		lock.put(updated_record).await
1105	}
1106
1107	async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1108		let record = Record::from_data(
1109			partition::PENDING_OFFBOARD,
1110			&pending.movement_id.to_bytes(),
1111			None,
1112			pending,
1113		)?;
1114		self.inner.write().await.put(record).await
1115	}
1116
1117	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1118		let records = self.inner.read().await
1119			.get_all(partition::PENDING_OFFBOARD).await?;
1120		records.into_iter().map(|r| r.to_data()).collect()
1121	}
1122
1123	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1124		self.inner.write().await
1125			.delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1126		Ok(())
1127	}
1128
1129	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1130		let record = Record::from_data(
1131			partition::EXIT_VTXO,
1132			&exit.vtxo_id.to_bytes(),
1133			None,
1134			exit,
1135		)?;
1136		self.inner.write().await.put(record).await
1137	}
1138
1139	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1140		self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1141		Ok(())
1142	}
1143
1144	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1145		let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1146		records.into_iter().map(|r| r.to_data()).collect()
1147	}
1148
1149	async fn store_exit_child_tx(
1150		&self,
1151		exit_txid: Txid,
1152		child_tx: &Transaction,
1153		origin: ExitTxOrigin,
1154	) -> anyhow::Result<()> {
1155		let exit_child = SerdeExitChildTx {
1156			child_tx: child_tx.clone(),
1157			origin,
1158		};
1159		let record = Record::from_data(
1160			partition::EXIT_CHILD_TX,
1161			&exit_txid.to_byte_array(),
1162			None,
1163			&exit_child,
1164		)?;
1165		self.inner.write().await.put(record).await
1166	}
1167
1168	async fn get_exit_child_tx(
1169		&self,
1170		exit_txid: Txid,
1171	) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1172		match self.inner.read().await
1173			.get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1174		{
1175			Some(record) => {
1176				let exit_child = record.to_data::<SerdeExitChildTx>()?;
1177				Ok(Some((exit_child.child_tx, exit_child.origin)))
1178			}
1179			None => Ok(None),
1180		}
1181	}
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186	use super::*;
1187
1188	#[test]
1189	fn storage_query_builder() {
1190		let query = Query::new_full_range(0).limit(10);
1191
1192		assert_eq!(query.partition, 0);
1193		assert_eq!(query.limit, Some(10));
1194		assert_eq!(query.range, ..);
1195	}
1196}
1197
1198/// This module provides comprehensive tests for all four methods of the
1199/// `StorageAdaptor` trait. Use these functions to validate custom implementations.
1200///
1201/// # Example
1202///
1203/// ```rust
1204/// use bark::persist::adaptor::memory::test_suite;
1205///
1206/// #[tokio::test]
1207/// async fn test_my_custom_adaptor() {
1208///     let storage = MyCustomStorageAdaptor::new();
1209///     test_suite::run_all(&storage).await;
1210/// }
1211/// ```
1212#[cfg(test)]
1213pub mod test_suite {
1214	use super::*;
1215	use super::partition::LAST_IDS;
1216	use super::sort::SortKey;
1217
1218	async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1219		for partition in partitions {
1220			let records = storage.get_all(*partition).await?;
1221			for record in records {
1222				storage.delete(record.partition, &record.pk).await?;
1223			}
1224		}
1225		Ok(())
1226	}
1227
1228	/// Runs all test suites against the given storage adaptor.
1229	pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1230		// put tests
1231		test_put_insert(storage).await;
1232		test_put_upsert(storage).await;
1233		test_put_with_sort_key(storage).await;
1234		test_put_without_sort_key(storage).await;
1235		test_put_multiple_partitions(storage).await;
1236
1237		// get tests
1238		test_get_existing(storage).await;
1239		test_get_after_update(storage).await;
1240
1241		// delete tests
1242		test_delete_existing(storage).await;
1243		test_delete_nonexistent(storage).await;
1244		test_delete_idempotent(storage).await;
1245
1246		// query tests
1247		test_query_empty_partition(storage).await;
1248		test_query_returns_partition_records(storage).await;
1249		test_query_ordering(storage).await;
1250		test_query_with_limit(storage).await;
1251		test_query_null_sort_key_excluded(storage).await;
1252		test_query_partition_isolation(storage).await;
1253		test_query_range(storage).await;
1254		test_query_exclusive_end_range(storage).await;
1255		test_query_full_range_limit_one(storage).await;
1256
1257		// get_all tests
1258		test_get_all_empty_partition(storage).await;
1259		test_get_all_returns_all_records(storage).await;
1260		test_get_all_includes_records_without_sort_key(storage).await;
1261		test_get_all_partition_isolation(storage).await;
1262		test_get_all_after_delete(storage).await;
1263
1264		// incremental_id tests
1265		test_incremental_id_starts_at_one(storage).await;
1266		test_incremental_id_increments(storage).await;
1267		test_incremental_id_partition_isolation(storage).await;
1268		test_incremental_id_persists_across_operations(storage).await;
1269	}
1270
1271	/// Tests that put inserts a new record.
1272	pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1273		let record = Record {
1274			pk: "put_insert_1".into(),
1275			partition: 0,
1276			sort_key: None,
1277			data: b"test data".to_vec(),
1278		};
1279
1280		storage.put(record).await.expect("put should succeed");
1281
1282		let retrieved = storage
1283			.get(0, b"put_insert_1")
1284			.await
1285			.expect("get should succeed")
1286			.expect("record should exist");
1287
1288		assert_eq!(retrieved.pk, b"put_insert_1");
1289		assert_eq!(retrieved.partition, 0);
1290		assert_eq!(retrieved.data, b"test data");
1291	}
1292
1293	/// Tests that put updates an existing record (upsert behavior).
1294	pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1295		let record1 = Record {
1296			pk: b"put_upsert_1".into(),
1297			partition: 0,
1298			sort_key: None,
1299			data: b"original".to_vec(),
1300		};
1301		storage.put(record1).await.expect("first put should succeed");
1302
1303		let record2 = Record {
1304			pk: "put_upsert_1".into(),
1305			partition: 0,
1306			sort_key: None,
1307			data: b"updated".to_vec(),
1308		};
1309		storage
1310			.put(record2)
1311			.await
1312			.expect("second put should succeed");
1313
1314		let retrieved = storage
1315			.get(0, b"put_upsert_1")
1316			.await
1317			.expect("get should succeed")
1318			.expect("record should exist");
1319
1320		assert_eq!(retrieved.data, b"updated", "data should be updated");
1321	}
1322
1323	/// Tests that put correctly stores the sort key.
1324	pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1325		let sort_key = SortKey::u32_asc(42);
1326		let record = Record {
1327			pk: b"put_sort_key_1".into(),
1328			partition: 0,
1329			sort_key: Some(sort_key.clone()),
1330			data: b"with sort key".to_vec(),
1331		};
1332
1333		storage.put(record).await.expect("put should succeed");
1334
1335		let retrieved = storage
1336			.get(0, b"put_sort_key_1")
1337			.await
1338			.expect("get should succeed")
1339			.expect("record should exist");
1340
1341		assert_eq!(retrieved.sort_key, Some(sort_key));
1342	}
1343
1344	/// Tests that put correctly handles records without sort keys.
1345	pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1346		let record = Record {
1347			pk: b"put_no_sort_key_1".into(),
1348			partition: 0,
1349			sort_key: None,
1350			data: b"no sort key".to_vec(),
1351		};
1352
1353		storage.put(record).await.expect("put should succeed");
1354
1355		let retrieved = storage
1356			.get(0, b"put_no_sort_key_1")
1357			.await
1358			.expect("get should succeed")
1359			.expect("record should exist");
1360
1361		assert!(retrieved.sort_key.is_none());
1362	}
1363
1364	/// Tests that put correctly handles multiple partitions.
1365	pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1366		let record_a = Record {
1367			pk: "put_multi_a".into(),
1368			partition: 0,
1369			sort_key: None,
1370			data: b"in partition a".to_vec(),
1371		};
1372		let record_b = Record {
1373			pk: "put_multi_b".into(),
1374			partition: 1,
1375			sort_key: None,
1376			data: b"in partition b".to_vec(),
1377		};
1378
1379		storage.put(record_a).await.expect("put a should succeed");
1380		storage.put(record_b).await.expect("put b should succeed");
1381
1382		let retrieved_a = storage
1383			.get(0, b"put_multi_a")
1384			.await
1385			.expect("get should succeed")
1386			.expect("record a should exist");
1387		let retrieved_b = storage
1388			.get(1, b"put_multi_b")
1389			.await
1390			.expect("get should succeed")
1391			.expect("record b should exist");
1392
1393		assert_eq!(retrieved_a.partition, 0);
1394		assert_eq!(retrieved_b.partition, 1);
1395	}
1396
1397	/// Tests that get returns an existing record.
1398	pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1399		let record = Record {
1400			pk: b"get_existing_1".into(),
1401			partition: 0,
1402			sort_key: Some(SortKey::u32_asc(100)),
1403			data: b"test".to_vec(),
1404		};
1405		storage.put(record).await.expect("put should succeed");
1406
1407		let retrieved = storage
1408			.get(0, b"get_existing_1")
1409			.await
1410			.expect("get should succeed");
1411
1412		assert!(retrieved.is_some());
1413		let retrieved = retrieved.unwrap();
1414		assert_eq!(retrieved.pk, b"get_existing_1");
1415		assert_eq!(retrieved.partition, 0);
1416		assert_eq!(retrieved.data, b"test");
1417
1418		// superset of the key
1419		assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1420		// subset of the key
1421		assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1422
1423		// non-existent key
1424		assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1425	}
1426
1427	/// Tests that get returns updated data after put.
1428	pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1429		let record1 = Record {
1430			pk: b"get_after_update_1".into(),
1431			partition: 0,
1432			sort_key: None,
1433			data: b"version1".to_vec(),
1434		};
1435		storage.put(record1).await.expect("put should succeed");
1436
1437		let record2 = Record {
1438			pk: b"get_after_update_1".into(),
1439			partition: 0,
1440			sort_key: None,
1441			data: b"version2".to_vec(),
1442		};
1443		storage.put(record2).await.expect("put should succeed");
1444
1445		let retrieved = storage
1446			.get(0, b"get_after_update_1")
1447			.await
1448			.expect("get should succeed")
1449			.expect("record should exist");
1450
1451		assert_eq!(retrieved.data, b"version2");
1452	}
1453
1454	/// Tests that delete removes an existing record and returns it.
1455	pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1456		let record = Record {
1457			pk: b"delete_existing_1".into(),
1458			partition: 0,
1459			sort_key: None,
1460			data: b"to delete".to_vec(),
1461		};
1462		storage.put(record.clone()).await.expect("put should succeed");
1463
1464		let deleted_record = storage
1465			.delete(0, b"delete_existing_1")
1466			.await
1467			.expect("delete should succeed");
1468
1469		assert_eq!(deleted_record, Some(record));
1470
1471		let retrieved = storage
1472			.get(0, b"delete_existing_1")
1473			.await
1474			.expect("get should succeed");
1475		assert!(retrieved.is_none(), "record should no longer exist");
1476	}
1477
1478	/// Tests that delete returns None for non-existent records.
1479	pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1480		let deleted_record = storage
1481			.delete(0, b"delete_nonexistent_does_not_exist")
1482			.await
1483			.expect("delete should succeed");
1484
1485		assert!(
1486			deleted_record.is_none(),
1487			"delete should return None for non-existent record"
1488		);
1489	}
1490
1491	/// Tests that delete is idempotent (second delete returns None).
1492	pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1493		let record = Record {
1494			pk: b"delete_idempotent_1".into(),
1495			partition: 0,
1496			sort_key: None,
1497			data: b"delete twice".to_vec(),
1498		};
1499		storage.put(record.clone()).await.expect("put should succeed");
1500
1501		let first_delete = storage
1502			.delete(0, b"delete_idempotent_1")
1503			.await
1504			.expect("first delete should succeed");
1505		let second_delete = storage
1506			.delete(0, b"delete_idempotent_1")
1507			.await
1508			.expect("second delete should succeed");
1509
1510		assert_eq!(first_delete, Some(record), "first delete should return the record");
1511		assert_eq!(second_delete, None, "second delete should return None");
1512	}
1513
1514	/// Tests that query returns empty results for empty partition.
1515	pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1516		clear_partitions(storage, &[0]).await.unwrap();
1517		let results = storage
1518			.query_sorted(Query::new_full_range(0))
1519			.await
1520			.expect("query should succeed");
1521
1522		assert!(results.is_empty());
1523	}
1524
1525	/// Tests that query returns all records in a partition.
1526	pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1527		clear_partitions(storage, &[0]).await.unwrap();
1528		for i in 0..3 {
1529			let record = Record {
1530				pk: format!("query_partition_{}", i).into(),
1531				partition: 0,
1532				sort_key: Some(SortKey::u32_asc(i)),
1533				data: format!("record_{}", i).as_bytes().to_vec(),
1534			};
1535			storage.put(record).await.expect("put should succeed");
1536		}
1537
1538		let results = storage
1539			.query_sorted(Query::new_full_range(0))
1540			.await
1541			.expect("query should succeed");
1542
1543		assert_eq!(results.len(), 3);
1544	}
1545
1546	/// Tests that query returns records in ascending sort key order by default.
1547	pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1548		clear_partitions(storage, &[0]).await.unwrap();
1549		// Insert in non-sequential order
1550		for i in [5, 2, 8, 1, 9] {
1551			let record = Record {
1552				pk: format!("query_asc_{}", i).into(),
1553				partition: 0,
1554				sort_key: Some(SortKey::u32_asc(i)),
1555				data: format!("record_{}", i).as_bytes().to_vec(),
1556			};
1557			storage.put(record).await.expect("put should succeed");
1558		}
1559
1560		let results = storage
1561			.query_sorted(Query::new_full_range(0))
1562			.await
1563			.expect("query should succeed");
1564
1565		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1566		assert_eq!(
1567			values,
1568			vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1569			"should be in ascending order"
1570		);
1571	}
1572
1573	/// Tests that query with limit returns at most N records.
1574	pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1575		clear_partitions(storage, &[0]).await.unwrap();
1576		for i in 0..10 {
1577			let record = Record {
1578				pk: format!("query_limit_{}", i).into(),
1579				partition: 0,
1580				sort_key: Some(SortKey::u32_asc(i)),
1581				data: format!("record_{}", i).as_bytes().to_vec(),
1582			};
1583			storage.put(record).await.expect("put should succeed");
1584		}
1585
1586		let results = storage
1587			.query_sorted(Query::new_full_range(0).limit(3))
1588			.await
1589			.expect("query should succeed");
1590
1591		assert_eq!(results.len(), 3);
1592		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1593		assert_eq!(
1594			values,
1595			vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1596			"should return first 3 records"
1597		);
1598	}
1599
1600	/// Tests that records without sort keys don't appear in query
1601	pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1602		clear_partitions(storage, &[0]).await.unwrap();
1603		// Records with sort keys
1604		let with_key_1 = Record {
1605			pk: "query_null_with_1".into(),
1606			partition: 0,
1607			sort_key: Some(SortKey::u32_asc(1)),
1608			data: b"with_key_1".to_vec(),
1609		};
1610		let with_key_2 = Record {
1611			pk: "query_null_with_2".into(),
1612			partition: 0,
1613			sort_key: Some(SortKey::u32_asc(2)),
1614			data: b"with_key_2".to_vec(),
1615		};
1616
1617		// Record without sort key
1618		let without_key = Record {
1619			pk: "query_null_without".into(),
1620			partition: 0,
1621			sort_key: None,
1622			data: b"no_key".to_vec(),
1623		};
1624
1625		storage.put(with_key_1).await.expect("put should succeed");
1626		storage.put(without_key).await.expect("put should succeed");
1627		storage.put(with_key_2).await.expect("put should succeed");
1628
1629		// query should only return records with sort keys
1630		let results_query = storage.query_sorted(Query::new_full_range(0)).await
1631			.expect("query should succeed");
1632		assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1633		assert_eq!(results_query[0].data, b"with_key_1");
1634		assert_eq!(results_query[1].data, b"with_key_2");
1635
1636		// get_all should return all records
1637		let results_all = storage.get_all(0).await
1638			.expect("get_all should succeed");
1639		assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1640
1641		// Verify all three records are present (order not guaranteed for get_all)
1642		let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1643		let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1644		let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1645		assert!(has_with_key_1, "get_all should include with_key_1");
1646		assert!(has_with_key_2, "get_all should include with_key_2");
1647		assert!(has_without_key, "get_all should include record without sort key");
1648	}
1649
1650	/// Tests that query only returns records from the specified partition.
1651	pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1652		clear_partitions(storage, &[0, 1]).await.unwrap();
1653		// Records in partition A
1654		for i in 0..3 {
1655			let record = Record {
1656				pk: format!("query_iso_a_{}", i).into(),
1657				partition: 0,
1658				sort_key: Some(SortKey::u32_asc(i)),
1659				data: format!("record_{}", i).as_bytes().to_vec(),
1660			};
1661			storage.put(record).await.expect("put should succeed");
1662		}
1663
1664		// Records in partition B
1665		for i in 0..5 {
1666			let record = Record {
1667				pk: format!("query_iso_b_{}", i).into(),
1668				partition: 1,
1669				sort_key: Some(SortKey::u32_asc(i)),
1670				data: format!("record_{}", i + 100).as_bytes().to_vec(),
1671			};
1672			storage.put(record).await.expect("put should succeed");
1673		}
1674
1675		let results_a = storage
1676			.query_sorted(Query::new_full_range(0))
1677			.await
1678			.expect("query should succeed");
1679
1680		let results_b = storage
1681			.query_sorted(Query::new_full_range(1))
1682			.await
1683			.expect("query should succeed");
1684
1685		assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1686		assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1687
1688		// Verify all results_a are from partition A
1689		assert!(results_a
1690			.iter()
1691			.all(|r| r.partition == 0));
1692
1693		// Verify all results_b are from partition B
1694		assert!(results_b
1695			.iter()
1696			.all(|r| r.partition == 1));
1697	}
1698
1699	/// Tests that query with start and end keys returns only records within the range.
1700	pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1701		clear_partitions(storage, &[0]).await.unwrap();
1702
1703		// Insert records with sort keys 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1704		for i in 1..=10u32 {
1705			let record = Record {
1706				pk: format!("query_range_{}", i).into(),
1707				partition: 0,
1708				sort_key: Some(SortKey::u32_asc(i)),
1709				data: format!("record_{}", i).as_bytes().to_vec(),
1710			};
1711			storage.put(record).await.expect("put should succeed");
1712		}
1713
1714		// Query with start key only (>= 5)
1715		let results_start = storage
1716			.query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1717			.await
1718			.expect("query should succeed");
1719
1720		assert_eq!(results_start.len(), 6, "should return records 5-10");
1721		let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1722		assert_eq!(
1723			values,
1724			vec![
1725				b"record_5".to_vec(),
1726				b"record_6".to_vec(),
1727				b"record_7".to_vec(),
1728				b"record_8".to_vec(),
1729				b"record_9".to_vec(),
1730				b"record_10".to_vec(),
1731			],
1732			"should return records from 5 onwards"
1733		);
1734
1735		// Query with end key only (<= 3)
1736		let results_end = storage
1737			.query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1738			.await
1739			.expect("query should succeed");
1740
1741		assert_eq!(results_end.len(), 3, "should return records 1-3");
1742		let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1743		assert_eq!(
1744			values,
1745			vec![
1746				b"record_1".to_vec(),
1747				b"record_2".to_vec(),
1748				b"record_3".to_vec(),
1749			],
1750			"should return records up to 3"
1751		);
1752
1753		// Query with both start and end keys (3 <= x <= 7)
1754		let results_range = storage
1755			.query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1756			.await
1757			.expect("query should succeed");
1758
1759		assert_eq!(results_range.len(), 5, "should return records 3-7");
1760		let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1761		assert_eq!(
1762			values,
1763			vec![
1764				b"record_3".to_vec(),
1765				b"record_4".to_vec(),
1766				b"record_5".to_vec(),
1767				b"record_6".to_vec(),
1768				b"record_7".to_vec(),
1769			],
1770			"should return records in range 3-7"
1771		);
1772
1773		// Query range with limit
1774		let results_range_limit = storage
1775			.query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1776			.await
1777			.expect("query should succeed");
1778
1779		assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1780		let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1781		assert_eq!(
1782			values,
1783			vec![
1784				b"record_2".to_vec(),
1785				b"record_3".to_vec(),
1786				b"record_4".to_vec(),
1787			],
1788			"should return first 3 records in range"
1789		);
1790
1791		// Query range that matches no records
1792		let results_empty = storage
1793			.query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1794			.await
1795			.expect("query should succeed");
1796
1797		assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1798	}
1799
1800	/// Tests that exclusive end ranges (start..end) exclude the end bound.
1801	pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1802		clear_partitions(storage, &[0]).await.unwrap();
1803
1804		for i in 1..=10u32 {
1805			let record = Record {
1806				pk: format!("query_excl_{}", i).into(),
1807				partition: 0,
1808				sort_key: Some(SortKey::u32_asc(i)),
1809				data: format!("record_{}", i).as_bytes().to_vec(),
1810			};
1811			storage.put(record).await.expect("put should succeed");
1812		}
1813
1814		// Exclusive end: 3 <= x < 7 (should return records 3, 4, 5, 6)
1815		let results = storage
1816			.query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1817			.await
1818			.expect("query should succeed");
1819
1820		assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1821		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1822		assert_eq!(
1823			values,
1824			vec![
1825				b"record_3".to_vec(),
1826				b"record_4".to_vec(),
1827				b"record_5".to_vec(),
1828				b"record_6".to_vec(),
1829			],
1830		);
1831
1832		// Exclusive end only: x < 4 (should return records 1, 2, 3)
1833		let results = storage
1834			.query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1835			.await
1836			.expect("query should succeed");
1837
1838		assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1839		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1840		assert_eq!(
1841			values,
1842			vec![
1843				b"record_1".to_vec(),
1844				b"record_2".to_vec(),
1845				b"record_3".to_vec(),
1846			],
1847		);
1848	}
1849
1850	/// Tests that full-range query with limit 1 returns the first sorted record.
1851	pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1852		clear_partitions(storage, &[0]).await.unwrap();
1853
1854		for i in [5u32, 2, 8, 1, 9] {
1855			let record = Record {
1856				pk: format!("query_limit1_{}", i).into(),
1857				partition: 0,
1858				sort_key: Some(SortKey::u32_asc(i)),
1859				data: format!("record_{}", i).as_bytes().to_vec(),
1860			};
1861			storage.put(record).await.expect("put should succeed");
1862		}
1863
1864		let results = storage
1865			.query_sorted(Query::new_full_range(0).limit(1))
1866			.await
1867			.expect("query should succeed");
1868
1869		assert_eq!(results.len(), 1);
1870		assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1871	}
1872
1873	/// Tests that incremental_id returns 1 for the first call on a partition.
1874	pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1875		// Clear the LAST_IDS entry for partition 0
1876		storage.delete(LAST_IDS, b"0").await.unwrap();
1877		let id = storage.incremental_id(0).await
1878			.expect("incremental_id should succeed");
1879
1880		assert_eq!(id, 1, "first id should be 1");
1881	}
1882
1883	/// Tests that incremental_id increments on subsequent calls.
1884	pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1885		clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1886
1887		let id1 = storage.incremental_id(0).await
1888			.expect("incremental_id should succeed");
1889		let id2 = storage.incremental_id(0).await
1890			.expect("incremental_id should succeed");
1891		let id3 = storage.incremental_id(0).await
1892			.expect("incremental_id should succeed");
1893
1894		assert_eq!(id1, 1, "first id should be 1");
1895		assert_eq!(id2, 2, "second id should be 2");
1896		assert_eq!(id3, 3, "third id should be 3");
1897	}
1898
1899	/// Tests that incremental_id maintains separate sequences for different partitions.
1900	pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1901		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1902
1903		// Generate IDs for partition A
1904		let a1 = storage.incremental_id(0).await
1905			.expect("incremental_id should succeed");
1906		let a2 = storage.incremental_id(0).await
1907			.expect("incremental_id should succeed");
1908		let a3 = storage.incremental_id(0).await
1909			.expect("incremental_id should succeed");
1910
1911		// Generate IDs for partition B
1912		let b1 = storage.incremental_id(1).await
1913			.expect("incremental_id should succeed");
1914		let b2 = storage.incremental_id(1).await
1915			.expect("incremental_id should succeed");
1916
1917		// Partition A should have 1, 2, 3
1918		assert_eq!(a1, 1);
1919		assert_eq!(a2, 2);
1920		assert_eq!(a3, 3);
1921
1922		// Partition B should have its own sequence starting at 1
1923		assert_eq!(b1, 1);
1924		assert_eq!(b2, 2);
1925
1926		// Generate more for A - should continue from 3
1927		let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1928		assert_eq!(a4, 4);
1929	}
1930
1931	/// Tests that incremental_id persists its state correctly.
1932	pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1933		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1934
1935		// Generate some IDs
1936		let id1 = storage.incremental_id(0).await
1937			.expect("incremental_id should succeed");
1938		let id2 = storage.incremental_id(0).await
1939			.expect("incremental_id should succeed");
1940		assert_eq!(id1, 1);
1941		assert_eq!(id2, 2);
1942
1943		// Verify the stored value can be retrieved directly
1944		let stored = storage
1945			.get(LAST_IDS, &[0])
1946			.await
1947			.expect("get should succeed")
1948			.expect("id record should exist");
1949		let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1950		assert_eq!(stored_id, 2, "stored id should be 2");
1951
1952		// Continue generating - should pick up where we left off
1953		let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1954		assert_eq!(id3, 3);
1955	}
1956
1957	/// Tests that get_all returns empty results for empty partition.
1958	pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1959		clear_partitions(storage, &[0]).await.unwrap();
1960		let results = storage
1961			.get_all(0)
1962			.await
1963			.expect("get_all should succeed");
1964
1965		assert!(results.is_empty(), "get_all should return empty for empty partition");
1966	}
1967
1968	/// Tests that get_all returns all records in a partition.
1969	pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1970		clear_partitions(storage, &[0]).await.unwrap();
1971
1972		// Insert records with sort keys
1973		for i in 0..5 {
1974			let record = Record {
1975				pk: format!("get_all_{}", i).into(),
1976				partition: 0,
1977				sort_key: Some(SortKey::u32_asc(i)),
1978				data: format!("record_{}", i).as_bytes().to_vec(),
1979			};
1980			storage.put(record).await.expect("put should succeed");
1981		}
1982
1983		let results = storage
1984			.get_all(0)
1985			.await
1986			.expect("get_all should succeed");
1987
1988		assert_eq!(results.len(), 5, "get_all should return all 5 records");
1989
1990		// Verify all records are present (order not guaranteed)
1991		for i in 0..5 {
1992			let expected_data = format!("record_{}", i).as_bytes().to_vec();
1993			let found = results.iter().any(|r| r.data == expected_data);
1994			assert!(found, "get_all should include record_{}", i);
1995		}
1996	}
1997
1998	/// Tests that get_all includes records without sort keys.
1999	pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
2000		clear_partitions(storage, &[0]).await.unwrap();
2001
2002		// Insert records with sort keys
2003		let with_key_1 = Record {
2004			pk: "get_all_with_1".into(),
2005			partition: 0,
2006			sort_key: Some(SortKey::u32_asc(1)),
2007			data: b"with_key_1".to_vec(),
2008		};
2009		let with_key_2 = Record {
2010			pk: "get_all_with_2".into(),
2011			partition: 0,
2012			sort_key: Some(SortKey::u32_asc(2)),
2013			data: b"with_key_2".to_vec(),
2014		};
2015
2016		// Insert records without sort keys
2017		let without_key_1 = Record {
2018			pk: "get_all_without_1".into(),
2019			partition: 0,
2020			sort_key: None,
2021			data: b"without_key_1".to_vec(),
2022		};
2023		let without_key_2 = Record {
2024			pk: "get_all_without_2".into(),
2025			partition: 0,
2026			sort_key: None,
2027			data: b"without_key_2".to_vec(),
2028		};
2029
2030		storage.put(with_key_1).await.expect("put should succeed");
2031		storage.put(without_key_1).await.expect("put should succeed");
2032		storage.put(with_key_2).await.expect("put should succeed");
2033		storage.put(without_key_2).await.expect("put should succeed");
2034
2035		let results = storage
2036			.get_all(0)
2037			.await
2038			.expect("get_all should succeed");
2039
2040		assert_eq!(results.len(), 4, "get_all should return all 4 records");
2041
2042		// Verify all records are present
2043		let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
2044		let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
2045		let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
2046		let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
2047
2048		assert!(has_with_1, "get_all should include with_key_1");
2049		assert!(has_with_2, "get_all should include with_key_2");
2050		assert!(has_without_1, "get_all should include without_key_1");
2051		assert!(has_without_2, "get_all should include without_key_2");
2052
2053		// Verify that query excludes records without sort keys
2054		let query_results = storage
2055			.query_sorted(Query::new_full_range(0))
2056			.await
2057			.expect("query should succeed");
2058
2059		assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
2060		let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
2061		assert!(!query_has_without, "query should not include records without sort keys");
2062	}
2063
2064	/// Tests that get_all only returns records from the specified partition.
2065	pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
2066		clear_partitions(storage, &[0, 1]).await.unwrap();
2067
2068		// Insert records in partition 0
2069		for i in 0..3 {
2070			let record = Record {
2071				pk: format!("partition_0_{}", i).into(),
2072				partition: 0,
2073				sort_key: Some(SortKey::u32_asc(i)),
2074				data: format!("p0_record_{}", i).as_bytes().to_vec(),
2075			};
2076			storage.put(record).await.expect("put should succeed");
2077		}
2078
2079		// Insert records in partition 1
2080		for i in 0..2 {
2081			let record = Record {
2082				pk: format!("partition_1_{}", i).into(),
2083				partition: 1,
2084				sort_key: Some(SortKey::u32_asc(i)),
2085				data: format!("p1_record_{}", i).as_bytes().to_vec(),
2086			};
2087			storage.put(record).await.expect("put should succeed");
2088		}
2089
2090		// get_all partition 0
2091		let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2092		assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2093		assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2094
2095		// get_all partition 1
2096		let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2097		assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2098		assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2099	}
2100
2101	/// Tests that get_all reflects deletions.
2102	pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2103		clear_partitions(storage, &[0]).await.unwrap();
2104
2105		for i in 0..3u32 {
2106			let record = Record {
2107				pk: format!("get_all_del_{}", i).into(),
2108				partition: 0,
2109				sort_key: Some(SortKey::u32_asc(i)),
2110				data: format!("record_{}", i).as_bytes().to_vec(),
2111			};
2112			storage.put(record).await.expect("put should succeed");
2113		}
2114
2115		storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2116
2117		let results = storage.get_all(0).await.expect("get_all should succeed");
2118		assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2119
2120		let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2121		assert!(!has_deleted, "deleted record should not appear");
2122	}
2123}