Skip to main content

bark/persist/adaptor/
mod.rs

1//! Storage adaptor module providing the [StorageAdaptor] trait and blanket
2//! implementation of [BarkPersister] for any type implementing [StorageAdaptor].
3//!
4//! This module provides an optimized single-table storage abstraction that can be
5//! efficiently implemented on various backends (SQLite, Postgres, MongoDB, Firebase,
6//! in-memory, etc.).
7//!
8//! The design uses structured keys:
9//! - **Primary key (`pk`)**: Unique identifier for each record
10//! - **Partition key**: Groups related records for efficient querying
11//! - **Sort key**: Enables ordered iteration and range queries
12//!
13//! # Example
14//!
15//! ```rust
16//! # use bark::persist::adaptor::memory::MemoryStorageAdaptor;
17//! # use bark::persist::adaptor::{Query, Record, StorageAdaptor, SortKey};
18//!
19//! # async fn example() -> anyhow::Result<()> {
20//! // Create an in-memory storage adaptor
21//! let mut storage = MemoryStorageAdaptor::new();
22//!
23//! // Store a record sorted by a numeric field (ascending)
24//! let record = Record {
25//!     partition: 0,
26//!     pk: "item:1".into(),
27//!     sort_key: Some(SortKey::u32_asc(42)),
28//!     data: b"hello world".to_vec(),
29//! };
30//! storage.put(record).await?;
31//!
32//! // Query with efficient index scan
33//! let query = Query::new_full_range(0).limit(10);
34//! let records = storage.query_sorted(query).await?;
35//! # Ok(())
36//! # }
37//! ```
38
39mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::{SortKey, SortKeyBuilder};
45
46#[cfg(feature = "indexed-db")]
47pub mod indexed_db;
48
49use std::ops::RangeBounds;
50
51use anyhow::Context;
52use bitcoin::{Amount, Transaction, Txid};
53use bitcoin::secp256k1::PublicKey;
54use bitcoin::hashes::Hash;
55#[cfg(feature = "onchain-bdk")]
56use bdk_core::Merge;
57#[cfg(feature = "onchain-bdk")]
58use bdk_wallet::ChangeSet;
59use chrono::{DateTime, Local};
60use lightning_invoice::Bolt11Invoice;
61use serde::{de::DeserializeOwned, Serialize};
62
63use ark::lightning::{PaymentHash, Preimage};
64use ark::{Vtxo, VtxoId};
65use ark::vtxo::Full;
66use bitcoin_ext::BlockDelta;
67
68use crate::actions::{WalletActionCheckpoint, WalletActionId};
69use crate::exit::ExitTxOrigin;
70use crate::movement::{
71	Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
72};
73use crate::persist::BarkPersister;
74use crate::persist::models::{
75	LightningReceive, PaidInvoice, PendingBoard, PendingOffboard,
76	RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
77	StoredRoundState, Unlocked, wallet_vtxo_from_full,
78};
79use crate::round::RoundState;
80use crate::vtxo::{VtxoState, VtxoStateKind};
81use crate::{WalletProperties, WalletVtxo};
82
83
84pub mod partition {
85	pub const PROPERTIES: u8 = 0;
86	#[allow(unused)]
87	pub const BDK_CHANGESET: u8 = 1;
88	pub const VTXO: u8 = 2;
89	pub const PUBLIC_KEY: u8 = 3;
90	pub const PENDING_BOARD: u8 = 4;
91	pub const ROUND_STATE: u8 = 5;
92	pub const MOVEMENT: u8 = 6;
93	/// was used by the now-removed `bark_lightning_send`. Do not reuse.
94	#[allow(unused)]
95	pub const LEGACY_LIGHTNING_SEND: u8 = 7;
96	pub const LIGHTNING_RECEIVE: u8 = 8;
97	pub const EXIT_VTXO: u8 = 9;
98	pub const EXIT_CHILD_TX: u8 = 10;
99	pub const MAILBOX_CHECKPOINT: u8 = 11;
100	pub const PENDING_OFFBOARD: u8 = 12;
101	/// An index table for payment method to movement
102	pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
103	/// Per-action checkpoint payloads keyed by [WalletActionId].
104	pub const WALLET_ACTION_CHECKPOINT: u8 = 14;
105	/// Permanent record of every settled outgoing lightning payment,
106	/// keyed by payment hash.
107	pub const PAID_INVOICE: u8 = 15;
108
109	pub const LAST_IDS: u8 = u8::MAX;
110}
111
112/// A storage record with structured keys.
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub struct Record {
115	/// Partition key for grouping related records (e.g., "vtxo", "movement").
116	///
117	/// Queries always filter by partition.
118	pub partition: u8,
119
120	/// Unique primary key
121	pub pk: Vec<u8>,
122
123	/// Optional sort key for ordered iteration within a partition.
124	///
125	/// Use [`SortKey::builder()`] to construct composite keys with
126	/// mixed sort directions.
127	///
128	/// This field may be set or changed after record insertion.
129	/// Implementation should support updating the sort key of a
130	/// record post-insert if needed.
131	pub sort_key: Option<SortKey>,
132
133	/// The record data encoded as JSON.
134	pub data: Vec<u8>,
135}
136
137impl Record {
138	/// Converts the record data to a typed value.
139	fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
140		serde_json::from_slice(&self.data).map_err(Into::into)
141	}
142
143	/// Creates a new record from a typed value.
144	fn from_data<T: Serialize>(
145		partition: u8,
146		pk: &[u8],
147		sort_key: Option<SortKey>,
148		data: &T,
149	) -> anyhow::Result<Record> {
150		Ok(Record {
151			partition,
152			pk: pk.to_vec(),
153			sort_key,
154			data: serde_json::to_vec(data)?,
155		})
156	}
157}
158
159/// A range of sort keys.
160pub trait QueryRange: RangeBounds<SortKey> + Send {}
161
162impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
163
164/// Query specification for retrieving sorted records from a partition.
165#[derive(Debug, Clone)]
166pub struct Query<R: QueryRange> {
167	/// Partition to query (required).
168	pub partition: u8,
169
170	/// Range of sort keys to query.
171	pub range: R,
172
173	/// Maximum number of records to return.
174	pub limit: Option<usize>,
175}
176
177impl<R: QueryRange> Query<R> {
178	/// Creates a new query for the given partition.
179	pub fn new(partition: u8, range: R) -> Self {
180		Self {
181			partition,
182			range,
183			limit: None,
184		}
185	}
186
187	/// Sets the maximum number of records to return.
188	///
189	/// If the range is greater than the limit, the query will
190	/// return the first `limit` records.
191	pub fn limit(mut self, limit: usize) -> Self {
192		self.limit = Some(limit);
193		self
194	}
195}
196
197impl Query<std::ops::RangeFull> {
198	pub fn new_full_range(partition: u8) -> Self {
199		Self::new(partition, ..)
200	}
201}
202
203fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
204	let body = pm.value_string();
205
206	let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
207	buf.extend(pm.type_str().as_bytes().iter().copied());
208	// nb we need to separate with a non-utf8 byte
209	// we use 0xfe so that we can easily query on prefix from <type>0xfe .. <type>0xff
210	buf.push(0xfe);
211	buf.extend(body.into_bytes());
212	buf
213}
214
215/// Storage adaptor trait for persistence backends.
216///
217/// This trait provides a minimal interface (5 methods) that can be efficiently
218/// implemented on various storage backends while enabling query optimization.
219///
220/// # Implementor's Guide
221///
222/// ## Simple backends (memory, file-based)
223///
224/// Store records in a map/list keyed by `(partition, pk)`.
225///
226/// - `query_sorted`: query sorted records by partition, excluding those without
227///	  a sort key defined.
228/// - `get_all`: return every record in the partition regardless of whether it
229///   has a sort key. No ordering is guaranteed.
230///
231/// ## Database backends (Postgres, MongoDB, Firebase, IndexedDB, etc.)
232///
233/// Create a single table with indexes:
234///
235/// ```sql
236/// CREATE TABLE storage (
237///     pk TEXT PRIMARY KEY,
238///     partition TEXT NOT NULL,
239///     sort_key BLOB,
240///     data BLOB NOT NULL
241/// );
242/// CREATE INDEX idx_partition_sort ON storage(partition, sort_key);
243/// ```
244///
245/// Translate [`Query`] to SQL:
246///
247/// ```sql
248/// -- query_sorted: only returns records with a sort key
249/// SELECT * FROM storage
250/// WHERE partition = :partition AND sort_key IS NOT NULL
251/// ORDER BY sort_key DESC
252/// ```
253///
254/// Translate [`get_all`](StorageAdaptor::get_all) to SQL:
255///
256/// ```sql
257/// -- get_all: returns all records in the partition, no ordering
258/// SELECT * FROM storage
259/// WHERE partition = :partition
260/// ```
261#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
262#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
263pub trait StorageAdaptor: Send + Sync + 'static {
264	/// Stores a record, inserting or updating by primary key.
265	async fn put(&mut self, record: Record) -> anyhow::Result<()>;
266
267	/// Retrieves a record by primary key.
268	///
269	/// Returns `None` if the record doesn't exist.
270	async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
271
272	/// Deletes a record by primary key.
273	///
274	/// Returns the deleted record if it existed, `None` otherwise.
275	async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
276
277	/// Queries sorted records from a partition.
278	///
279	/// Results are ordered by sort key.
280	/// Records without a sort key must not be returned, to query all
281	/// records use [`StorageAdaptor::get_all`].
282	async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
283		-> anyhow::Result<Vec<Record>>;
284
285	/// Get all records in a partition.
286	///
287	/// No ordering guarantees are made.
288	async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
289
290	/// Increments the last partition id, then stores and returns the new id.
291	async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
292		let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
293			.map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
294		let next_partition_id = last_partition_id + 1;
295
296		let record = Record::from_data(
297			partition::LAST_IDS,
298			&[partition],
299			None,
300			&next_partition_id,
301		)?;
302
303		self.put(record).await?;
304		Ok(next_partition_id)
305	}
306}
307
308async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
309	match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
310		Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
311		None => Ok(None),
312	}
313}
314
315async fn get_check_vtxo_state<S: StorageAdaptor>(
316	adaptor: &S,
317	vtxo_id: VtxoId,
318	allowed_states: &[VtxoStateKind],
319) -> anyhow::Result<SerdeVtxo> {
320	let vtxo = get_vtxo(adaptor, vtxo_id).await?
321		.context("vtxo not found")?;
322
323	let current_state = vtxo.current_state().context("vtxo has no state")?;
324	if !allowed_states.contains(&current_state.kind()) {
325		bail!("current state {:?} not in allowed states {:?}",
326			current_state.kind(), allowed_states
327		);
328	}
329
330	Ok(vtxo)
331}
332
333async fn update_vtxo_state_checked<S: StorageAdaptor>(
334	adaptor: &mut S,
335	vtxo_id: VtxoId,
336	new_state: VtxoState,
337	allowed_old_states: &[VtxoStateKind],
338) -> anyhow::Result<WalletVtxo> {
339	let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
340
341	let sk = sort::vtxo_sort_key(
342		new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
343	);
344
345	serde_vtxo.states.push(new_state.clone());
346	let updated_record = Record::from_data(
347		partition::VTXO,
348		&vtxo_id.to_bytes(),
349		Some(sk),
350		&serde_vtxo,
351	)?;
352
353	adaptor.put(updated_record).await?;
354
355	Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, new_state))
356}
357
358pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
359	inner: tokio::sync::RwLock<S>,
360}
361
362impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
363	pub fn new(inner: S) -> Self {
364		Self {
365			inner: tokio::sync::RwLock::new(inner),
366		}
367	}
368}
369
370/// Blanket implementation of `BarkPersister` for any type implementing `StorageAdaptor`.
371#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
372#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
373impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
374	async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
375		let record = Record::from_data(
376			partition::PROPERTIES,
377			// NB: a single set of properties is stored, so no need for primary key
378			&[],
379			None,
380			properties,
381		)?;
382		self.inner.write().await.put(record).await
383	}
384
385	async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
386		match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
387			Some(record) => Ok(Some(record.to_data()?)),
388			None => Ok(None),
389		}
390	}
391
392	async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
393		let mut properties = match self.read_properties().await? {
394			Some(properties) => properties,
395			None => bail!("wallet not initialized"),
396		};
397
398		properties.server_pubkey = Some(server_pubkey);
399
400		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
401		self.inner.write().await.put(record).await
402	}
403
404	async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
405		let mut properties = match self.read_properties().await? {
406			Some(properties) => properties,
407			None => bail!("wallet not initialized"),
408		};
409
410		properties.server_mailbox_pubkey = Some(server_mailbox_pubkey);
411
412		let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
413		self.inner.write().await.put(record).await
414	}
415
416	#[cfg(feature = "onchain-bdk")]
417	async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
418		match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
419			Some(record) => record.to_data(),
420			None => Ok(ChangeSet::default()),
421		}
422	}
423
424	#[cfg(feature = "onchain-bdk")]
425	async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
426		let mut current = self.initialize_bdk_wallet().await?;
427		current.merge(changeset.clone());
428
429		let record = Record::from_data(
430			partition::BDK_CHANGESET,
431			// NB: a single changeset is stored, so no need for primary key
432			&[],
433			None,
434			&current,
435		)?;
436		self.inner.write().await.put(record).await
437	}
438
439	async fn create_new_movement(
440		&self,
441		status: MovementStatus,
442		subsystem: &MovementSubsystem,
443		time: DateTime<Local>,
444	) -> anyhow::Result<MovementId> {
445		let mut lock = self.inner.write().await;
446
447		let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
448		let movement = Movement::new(id, status, subsystem, time);
449
450		let record = Record::from_data(
451			partition::MOVEMENT,
452			&id.to_bytes(),
453			Some(sort::movement_sort_key(&time)),
454			&movement,
455		)?;
456		lock.put(record).await?;
457
458		Ok(id)
459	}
460
461	async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
462		let mut guard = self.inner.write().await;
463
464		let record = Record::from_data(
465			partition::MOVEMENT,
466			&movement.id.to_bytes(),
467			Some(sort::movement_sort_key(&movement.time.created_at)),
468			movement,
469		)?;
470		guard.put(record).await?;
471
472		// then add records for each payment method
473		let sent = movement.sent_to.iter().map(|d| &d.destination);
474		let rcvd = movement.received_on.iter().map(|d| &d.destination);
475		for pm in sent.chain(rcvd) {
476			let pm_bytes = serialize_payment_method(pm);
477			let primary_key = {
478				// We just need a unique key, but we will never query using this
479				let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
480				buf.extend(pm_bytes.iter().copied());
481				buf.extend(movement.id.to_bytes());
482				buf
483			};
484			let record = Record::from_data(
485				partition::MOVEMENT_PAYMENT_METHOD,
486				&primary_key,
487				Some(SortKey::from_bytes(pm_bytes)),
488				&movement.id.0,
489			)?;
490			guard.put(record).await?;
491		}
492
493		Ok(())
494	}
495
496	async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
497		self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
498			.await?
499			.context("movement not found")?
500			.to_data()
501	}
502
503	async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
504		let records = self.inner.read().await
505			.query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
506		records.into_iter().map(|r| r.to_data()).collect()
507	}
508
509	async fn get_movements_by_payment_method(
510		&self,
511		payment_method: &PaymentMethod,
512	) -> anyhow::Result<Vec<Movement>> {
513		let pm_bytes = serialize_payment_method(payment_method);
514		let sort_key = SortKey::from_bytes(pm_bytes);
515
516		let guard = self.inner.read().await;
517		let idx_recs = guard.query_sorted(Query::new(
518			partition::MOVEMENT_PAYMENT_METHOD,
519			sort_key.clone()..=sort_key,
520		)).await?;
521
522		let mut ret = Vec::with_capacity(idx_recs.len());
523		for idx_rec in idx_recs {
524			let id = MovementId::new(idx_rec.to_data::<u32>()
525				.context("corrupt db: movement payment method index value")?);
526
527			ret.push(
528				guard.get(partition::MOVEMENT, &id.to_bytes()).await?
529					.context("corrupt db: movement payment method entry for unknown movement")?
530					.to_data()
531					.context("corrupt db: invalid movement record")?
532			);
533		}
534		Ok(ret)
535	}
536
537	async fn store_pending_board(
538		&self,
539		vtxo: &Vtxo<Full>,
540		funding_tx: &Transaction,
541		movement_id: MovementId,
542	) -> anyhow::Result<()> {
543		let pending_board = PendingBoard {
544			vtxos: vec![vtxo.id()],
545			amount: vtxo.amount(),
546			funding_tx: funding_tx.clone(),
547			movement_id,
548		};
549
550		let record = Record::from_data(
551			partition::PENDING_BOARD,
552			&vtxo.id().to_bytes(),
553			None,
554			&pending_board,
555		)?;
556
557		self.inner.write().await.put(record).await
558	}
559
560	async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
561		self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
562		Ok(())
563	}
564
565	async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
566		let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
567		records
568			.into_iter()
569			.map(|r| {
570				let board = r.to_data::<PendingBoard>()?;
571				Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
572			})
573			.collect()
574	}
575
576	async fn get_pending_board_by_vtxo_id(
577		&self,
578		vtxo_id: VtxoId,
579	) -> anyhow::Result<Option<PendingBoard>> {
580		match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
581			Some(record) => Ok(Some(record.to_data()?)),
582			None => Ok(None),
583		}
584	}
585
586	async fn store_round_state_lock_vtxos(
587		&self,
588		round_state: &RoundState,
589	) -> anyhow::Result<RoundStateId> {
590		let mut lock = self.inner.write().await;
591
592		let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
593
594		let allowed_states = &[VtxoStateKind::Spendable];
595
596		// First check that the inputs are spendable
597		for vtxo in round_state.participation().inputs.iter() {
598			get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
599		}
600
601		for vtxo in round_state.participation().inputs.iter() {
602			update_vtxo_state_checked(
603				&mut *lock,
604				vtxo.id(),
605				VtxoState::Locked {
606					holder: round_state.movement_id
607						.map(|id| crate::vtxo::VtxoLockHolder::Movement { id }),
608				},
609				allowed_states,
610			).await?;
611		}
612
613		let serde_state = SerdeRoundState::from(round_state);
614		let record = Record::from_data(
615			partition::ROUND_STATE,
616			&id.to_bytes(),
617			Some(sort::SortKey::u32_asc(id.0)),
618			&serde_state,
619		)?;
620		lock.put(record).await?;
621
622		Ok(id)
623	}
624
625	async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
626		let serde_state = SerdeRoundState::from(round_state.state());
627		let record = Record::from_data(
628			partition::ROUND_STATE,
629			&round_state.id().to_bytes(),
630			Some(sort::SortKey::u32_asc(round_state.id().0)),
631			&serde_state,
632		)?;
633		self.inner.write().await.put(record).await
634	}
635
636	async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
637		self.inner.write().await
638			.delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
639		Ok(())
640	}
641
642	async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
643		let record = self.inner.read().await
644			.get(partition::ROUND_STATE, &_id.to_bytes()).await?;
645		match record {
646			Some(r) => {
647				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
648				let id = RoundStateId(u32::from_be_bytes(pk_slice));
649				let state = r.to_data::<SerdeRoundState>()?.into();
650				Ok(Some(StoredRoundState::new(id, state)))
651			},
652			None => Ok(None),
653		}
654	}
655
656	async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
657		let records = self.inner.read().await
658			.get_all(partition::ROUND_STATE).await?;
659		records.into_iter()
660			.map(|r| {
661				let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
662				Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
663			})
664			.collect()
665	}
666
667	async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
668		let mut lock = self.inner.write().await;
669
670		for (vtxo, state) in vtxos {
671			let serde_vtxo = SerdeVtxo {
672				vtxo: (*vtxo).clone(),
673				states: vec![(*state).clone()],
674			};
675
676			let sk = sort::vtxo_sort_key(
677				state.kind(), vtxo.expiry_height(), vtxo.amount(),
678			);
679			let record = Record::from_data(
680				partition::VTXO,
681				&vtxo.id().to_bytes(),
682				Some(sk),
683				&serde_vtxo,
684			)?;
685			lock.put(record).await?;
686		}
687		Ok(())
688	}
689
690	async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
691		let lock = self.inner.read().await;
692		match get_vtxo(&*lock, id).await? {
693			Some(serde_vtxo) => {
694				let state = serde_vtxo.current_state()
695					.context("vtxo has no state")?.clone();
696				Ok(Some(wallet_vtxo_from_full(&serde_vtxo.vtxo, state)))
697			},
698			None => Ok(None),
699		}
700	}
701
702	async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
703		let records = self.inner.read().await
704			.query_sorted(Query::new_full_range(partition::VTXO)).await?;
705
706		records
707			.into_iter()
708			.map(|r| {
709				let serde_vtxo = r.to_data::<SerdeVtxo>()?;
710				let state = serde_vtxo
711					.current_state()
712					.cloned()
713					.context("vtxo has no state")?;
714				Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, state))
715			})
716			.collect()
717	}
718
719	async fn get_vtxos_by_state(
720		&self,
721		states: &[VtxoStateKind],
722	) -> anyhow::Result<Vec<WalletVtxo>> {
723		let lock = self.inner.read().await;
724
725		let range = |state: VtxoStateKind| {
726			let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
727			let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
728			(start, end)
729		};
730
731		let mut records = Vec::new();
732		for state in states {
733			let (start, end) = range(*state);
734			let query = Query::new(partition::VTXO, start..=end);
735
736			for record in lock.query_sorted(query).await? {
737				let serde_vtxo = record.to_data::<SerdeVtxo>()?;
738				let current_state = serde_vtxo.current_state()
739					.context("vtxo has no current state")?.clone();
740				debug_assert_eq!(current_state.kind(), *state);
741				records.push(wallet_vtxo_from_full(&serde_vtxo.vtxo, current_state));
742			}
743		}
744
745		Ok(records)
746	}
747
748	async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
749		let lock = self.inner.read().await;
750		Ok(get_vtxo(&*lock, id).await?.map(|s| s.vtxo))
751	}
752
753	async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
754		let lock = self.inner.read().await;
755		let mut out = Vec::with_capacity(ids.len());
756		for id in ids {
757			let serde_vtxo = get_vtxo(&*lock, *id).await?
758				.with_context(|| format!("vtxo {id} not found"))?;
759			out.push(serde_vtxo.vtxo);
760		}
761		Ok(out)
762	}
763
764	async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
765		match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
766			Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
767			None => Ok(None),
768		}
769	}
770
771	async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
772		match self.get_wallet_vtxo(id).await? {
773			Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
774			None => Ok(false),
775		}
776	}
777
778	async fn update_vtxo_state_checked(
779		&self,
780		vtxo_id: VtxoId,
781		new_state: VtxoState,
782		allowed_old_states: &[VtxoStateKind],
783	) -> anyhow::Result<WalletVtxo> {
784		let mut lock = self.inner.write().await;
785		update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
786	}
787
788	async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
789		let vtxo_key = SerdeVtxoKey { index, public_key };
790		let record = Record::from_data(
791			partition::PUBLIC_KEY,
792			&public_key.serialize()[..],
793			Some(sort::SortKey::u64_desc(index as u64)),
794			&vtxo_key,
795		)?;
796		self.inner.write().await.put(record).await
797	}
798
799	async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
800		// pks are sorted descending, so the first one is the highest index
801		let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
802		let records = self.inner.read().await.query_sorted(query).await?;
803
804		match records.into_iter().next() {
805			Some(record) => {
806				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
807				Ok(Some(vtxo_key.index))
808			}
809			None => Ok(None),
810		}
811	}
812
813	async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
814		match self.inner.read().await
815			.get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
816		{
817			Some(record) => {
818				let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
819				Ok(Some(vtxo_key.index))
820			}
821			None => Ok(None),
822		}
823	}
824
825	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
826		match self.inner.read().await
827			.get(partition::MAILBOX_CHECKPOINT, &[]).await?
828		{
829			Some(record) => Ok(record.to_data::<u64>()?),
830			None => Ok(0),
831		}
832	}
833
834	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
835		let mut lock = self.inner.write().await;
836		let record = Record::from_data(
837			partition::MAILBOX_CHECKPOINT,
838			&[],
839			None,
840			&checkpoint,
841		)?;
842		lock.put(record).await?;
843		Ok(())
844	}
845
846	async fn upsert_wallet_action_checkpoint(
847		&self,
848		id: &WalletActionId,
849		checkpoint: &WalletActionCheckpoint,
850	) -> anyhow::Result<()> {
851		let record = Record::from_data(
852			partition::WALLET_ACTION_CHECKPOINT,
853			id.as_bytes(),
854			None,
855			checkpoint,
856		)?;
857		self.inner.write().await.put(record).await
858	}
859
860	async fn get_wallet_action_checkpoint(
861		&self,
862		id: &WalletActionId,
863	) -> anyhow::Result<Option<WalletActionCheckpoint>> {
864		match self.inner.read().await
865			.get(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?
866		{
867			Some(record) => Ok(Some(record.to_data()?)),
868			None => Ok(None),
869		}
870	}
871
872	async fn get_all_wallet_action_checkpoints(
873		&self,
874	) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
875		let records = self.inner.read().await
876			.get_all(partition::WALLET_ACTION_CHECKPOINT).await?;
877		records.into_iter().map(|r| r.to_data()).collect()
878	}
879
880	async fn remove_wallet_action_checkpoint(
881		&self,
882		id: &WalletActionId,
883	) -> anyhow::Result<()> {
884		self.inner.write().await
885			.delete(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?;
886		Ok(())
887	}
888
889	async fn record_paid_invoice(
890		&self,
891		payment_hash: PaymentHash,
892		preimage: Preimage,
893	) -> anyhow::Result<()> {
894		let key = payment_hash.to_byte_array();
895		// Idempotent: preserve the original paid_at across retries.
896		let mut lock = self.inner.write().await;
897		if lock.get(partition::PAID_INVOICE, &key).await?.is_some() {
898			return Ok(());
899		}
900		let paid = PaidInvoice {
901			payment_hash,
902			preimage,
903			paid_at: chrono::Local::now(),
904		};
905		let record = Record::from_data(partition::PAID_INVOICE, &key, None, &paid)?;
906		lock.put(record).await
907	}
908
909	async fn get_paid_invoice(
910		&self,
911		payment_hash: PaymentHash,
912	) -> anyhow::Result<Option<PaidInvoice>> {
913		match self.inner.read().await
914			.get(partition::PAID_INVOICE, &payment_hash.to_byte_array()).await?
915		{
916			Some(record) => Ok(Some(record.to_data()?)),
917			None => Ok(None),
918		}
919	}
920
921	async fn store_lightning_receive(
922		&self,
923		payment_hash: PaymentHash,
924		preimage: Preimage,
925		invoice: &Bolt11Invoice,
926		htlc_recv_cltv_delta: BlockDelta,
927	) -> anyhow::Result<()> {
928		let lightning_receive = LightningReceive {
929			payment_hash,
930			payment_preimage: preimage,
931			invoice: invoice.clone(),
932			htlc_recv_cltv_delta,
933			htlc_vtxos: vec![],
934			movement_id: None,
935			finished_at: None,
936			preimage_revealed_at: None,
937		};
938
939		let record = Record::from_data(
940			partition::LIGHTNING_RECEIVE,
941			&payment_hash.to_byte_array(),
942			None,
943			&lightning_receive,
944		)?;
945		self.inner.write().await.put(record).await
946	}
947
948	async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
949		let records = self.inner.read().await
950			.get_all(partition::LIGHTNING_RECEIVE).await?;
951		records
952			.into_iter()
953			.filter_map(|r| {
954				let receive = r.to_data::<LightningReceive>().ok()?;
955				if receive.finished_at.is_none() {
956					Some(Ok(receive))
957				} else {
958					None
959				}
960			})
961			.collect()
962	}
963
964	async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
965		let mut lock = self.inner.write().await;
966
967		let pk = payment_hash.to_byte_array();
968		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
969			.context("lightning receive not found")?;
970		let mut lightning_receive: LightningReceive = record.to_data()?;
971
972		lightning_receive.preimage_revealed_at = Some(Local::now());
973
974		let updated_record = Record::from_data(
975			partition::LIGHTNING_RECEIVE,
976			&pk,
977			None,
978			&lightning_receive,
979		)?;
980		lock.put(updated_record).await
981	}
982
983	async fn update_lightning_receive(
984		&self,
985		payment_hash: PaymentHash,
986		vtxo_ids: &[VtxoId],
987		movement_id: MovementId,
988	) -> anyhow::Result<()> {
989		let mut lock = self.inner.write().await;
990		let pk = payment_hash.to_byte_array();
991		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
992			.context("lightning receive not found")?;
993		let mut lightning_receive: LightningReceive = record.to_data()?;
994
995		let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
996		for vtxo_id in vtxo_ids {
997			let vtxo = get_vtxo(&*lock, *vtxo_id).await?
998				.context("vtxo not found")?;
999			htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
1000		}
1001
1002		lightning_receive.htlc_vtxos = htlc_vtxos;
1003		lightning_receive.movement_id = Some(movement_id);
1004
1005		let updated_record = Record::from_data(
1006			partition::LIGHTNING_RECEIVE,
1007			&pk,
1008			None,
1009			&lightning_receive,
1010		)?;
1011		lock.put(updated_record).await
1012	}
1013
1014	async fn fetch_lightning_receive_by_payment_hash(
1015		&self,
1016		payment_hash: PaymentHash,
1017	) -> anyhow::Result<Option<LightningReceive>> {
1018		match self.inner.read().await
1019			.get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1020		{
1021			Some(record) => Ok(Some(record.to_data()?)),
1022			None => Ok(None),
1023		}
1024	}
1025
1026	async fn finish_pending_lightning_receive(
1027		&self,
1028		payment_hash: PaymentHash,
1029	) -> anyhow::Result<()> {
1030		let mut lock = self.inner.write().await;
1031		let pk = payment_hash.to_byte_array();
1032		let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1033			.context("lightning receive not found")?;
1034		let mut lightning_receive: LightningReceive = record.to_data()?;
1035
1036		lightning_receive.finished_at = Some(Local::now());
1037
1038		let updated_record = Record::from_data(
1039			partition::LIGHTNING_RECEIVE,
1040			&pk,
1041			None,
1042			&lightning_receive,
1043		)?;
1044		lock.put(updated_record).await
1045	}
1046
1047	async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1048		let record = Record::from_data(
1049			partition::PENDING_OFFBOARD,
1050			&pending.movement_id.to_bytes(),
1051			None,
1052			pending,
1053		)?;
1054		self.inner.write().await.put(record).await
1055	}
1056
1057	async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1058		let records = self.inner.read().await
1059			.get_all(partition::PENDING_OFFBOARD).await?;
1060		records.into_iter().map(|r| r.to_data()).collect()
1061	}
1062
1063	async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1064		self.inner.write().await
1065			.delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1066		Ok(())
1067	}
1068
1069	async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1070		let record = Record::from_data(
1071			partition::EXIT_VTXO,
1072			&exit.vtxo_id.to_bytes(),
1073			None,
1074			exit,
1075		)?;
1076		self.inner.write().await.put(record).await
1077	}
1078
1079	async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1080		self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1081		Ok(())
1082	}
1083
1084	async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1085		let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1086		records.into_iter().map(|r| r.to_data()).collect()
1087	}
1088
1089	async fn store_exit_child_tx(
1090		&self,
1091		exit_txid: Txid,
1092		child_tx: &Transaction,
1093		origin: ExitTxOrigin,
1094	) -> anyhow::Result<()> {
1095		let exit_child = SerdeExitChildTx {
1096			child_tx: child_tx.clone(),
1097			origin,
1098		};
1099		let record = Record::from_data(
1100			partition::EXIT_CHILD_TX,
1101			&exit_txid.to_byte_array(),
1102			None,
1103			&exit_child,
1104		)?;
1105		self.inner.write().await.put(record).await
1106	}
1107
1108	async fn get_exit_child_tx(
1109		&self,
1110		exit_txid: Txid,
1111	) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1112		match self.inner.read().await
1113			.get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1114		{
1115			Some(record) => {
1116				let exit_child = record.to_data::<SerdeExitChildTx>()?;
1117				Ok(Some((exit_child.child_tx, exit_child.origin)))
1118			}
1119			None => Ok(None),
1120		}
1121	}
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126	use super::*;
1127
1128	#[test]
1129	fn storage_query_builder() {
1130		let query = Query::new_full_range(0).limit(10);
1131
1132		assert_eq!(query.partition, 0);
1133		assert_eq!(query.limit, Some(10));
1134		assert_eq!(query.range, ..);
1135	}
1136}
1137
1138/// This module provides comprehensive tests for all four methods of the
1139/// `StorageAdaptor` trait. Use these functions to validate custom implementations.
1140///
1141/// # Example
1142///
1143/// ```rust
1144/// use bark::persist::adaptor::memory::test_suite;
1145///
1146/// #[tokio::test]
1147/// async fn test_my_custom_adaptor() {
1148///     let storage = MyCustomStorageAdaptor::new();
1149///     test_suite::run_all(&storage).await;
1150/// }
1151/// ```
1152#[cfg(test)]
1153pub mod test_suite {
1154	use super::*;
1155	use super::partition::LAST_IDS;
1156	use super::sort::SortKey;
1157
1158	async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1159		for partition in partitions {
1160			let records = storage.get_all(*partition).await?;
1161			for record in records {
1162				storage.delete(record.partition, &record.pk).await?;
1163			}
1164		}
1165		Ok(())
1166	}
1167
1168	/// Runs all test suites against the given storage adaptor.
1169	pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1170		// put tests
1171		test_put_insert(storage).await;
1172		test_put_upsert(storage).await;
1173		test_put_with_sort_key(storage).await;
1174		test_put_without_sort_key(storage).await;
1175		test_put_multiple_partitions(storage).await;
1176
1177		// get tests
1178		test_get_existing(storage).await;
1179		test_get_after_update(storage).await;
1180
1181		// delete tests
1182		test_delete_existing(storage).await;
1183		test_delete_nonexistent(storage).await;
1184		test_delete_idempotent(storage).await;
1185
1186		// query tests
1187		test_query_empty_partition(storage).await;
1188		test_query_returns_partition_records(storage).await;
1189		test_query_ordering(storage).await;
1190		test_query_with_limit(storage).await;
1191		test_query_null_sort_key_excluded(storage).await;
1192		test_query_partition_isolation(storage).await;
1193		test_query_range(storage).await;
1194		test_query_exclusive_end_range(storage).await;
1195		test_query_full_range_limit_one(storage).await;
1196
1197		// get_all tests
1198		test_get_all_empty_partition(storage).await;
1199		test_get_all_returns_all_records(storage).await;
1200		test_get_all_includes_records_without_sort_key(storage).await;
1201		test_get_all_partition_isolation(storage).await;
1202		test_get_all_after_delete(storage).await;
1203
1204		// incremental_id tests
1205		test_incremental_id_starts_at_one(storage).await;
1206		test_incremental_id_increments(storage).await;
1207		test_incremental_id_partition_isolation(storage).await;
1208		test_incremental_id_persists_across_operations(storage).await;
1209	}
1210
1211	/// Tests that put inserts a new record.
1212	pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1213		let record = Record {
1214			pk: "put_insert_1".into(),
1215			partition: 0,
1216			sort_key: None,
1217			data: b"test data".to_vec(),
1218		};
1219
1220		storage.put(record).await.expect("put should succeed");
1221
1222		let retrieved = storage
1223			.get(0, b"put_insert_1")
1224			.await
1225			.expect("get should succeed")
1226			.expect("record should exist");
1227
1228		assert_eq!(retrieved.pk, b"put_insert_1");
1229		assert_eq!(retrieved.partition, 0);
1230		assert_eq!(retrieved.data, b"test data");
1231	}
1232
1233	/// Tests that put updates an existing record (upsert behavior).
1234	pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1235		let record1 = Record {
1236			pk: b"put_upsert_1".into(),
1237			partition: 0,
1238			sort_key: None,
1239			data: b"original".to_vec(),
1240		};
1241		storage.put(record1).await.expect("first put should succeed");
1242
1243		let record2 = Record {
1244			pk: "put_upsert_1".into(),
1245			partition: 0,
1246			sort_key: None,
1247			data: b"updated".to_vec(),
1248		};
1249		storage
1250			.put(record2)
1251			.await
1252			.expect("second put should succeed");
1253
1254		let retrieved = storage
1255			.get(0, b"put_upsert_1")
1256			.await
1257			.expect("get should succeed")
1258			.expect("record should exist");
1259
1260		assert_eq!(retrieved.data, b"updated", "data should be updated");
1261	}
1262
1263	/// Tests that put correctly stores the sort key.
1264	pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1265		let sort_key = SortKey::u32_asc(42);
1266		let record = Record {
1267			pk: b"put_sort_key_1".into(),
1268			partition: 0,
1269			sort_key: Some(sort_key.clone()),
1270			data: b"with sort key".to_vec(),
1271		};
1272
1273		storage.put(record).await.expect("put should succeed");
1274
1275		let retrieved = storage
1276			.get(0, b"put_sort_key_1")
1277			.await
1278			.expect("get should succeed")
1279			.expect("record should exist");
1280
1281		assert_eq!(retrieved.sort_key, Some(sort_key));
1282	}
1283
1284	/// Tests that put correctly handles records without sort keys.
1285	pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1286		let record = Record {
1287			pk: b"put_no_sort_key_1".into(),
1288			partition: 0,
1289			sort_key: None,
1290			data: b"no sort key".to_vec(),
1291		};
1292
1293		storage.put(record).await.expect("put should succeed");
1294
1295		let retrieved = storage
1296			.get(0, b"put_no_sort_key_1")
1297			.await
1298			.expect("get should succeed")
1299			.expect("record should exist");
1300
1301		assert!(retrieved.sort_key.is_none());
1302	}
1303
1304	/// Tests that put correctly handles multiple partitions.
1305	pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1306		let record_a = Record {
1307			pk: "put_multi_a".into(),
1308			partition: 0,
1309			sort_key: None,
1310			data: b"in partition a".to_vec(),
1311		};
1312		let record_b = Record {
1313			pk: "put_multi_b".into(),
1314			partition: 1,
1315			sort_key: None,
1316			data: b"in partition b".to_vec(),
1317		};
1318
1319		storage.put(record_a).await.expect("put a should succeed");
1320		storage.put(record_b).await.expect("put b should succeed");
1321
1322		let retrieved_a = storage
1323			.get(0, b"put_multi_a")
1324			.await
1325			.expect("get should succeed")
1326			.expect("record a should exist");
1327		let retrieved_b = storage
1328			.get(1, b"put_multi_b")
1329			.await
1330			.expect("get should succeed")
1331			.expect("record b should exist");
1332
1333		assert_eq!(retrieved_a.partition, 0);
1334		assert_eq!(retrieved_b.partition, 1);
1335	}
1336
1337	/// Tests that get returns an existing record.
1338	pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1339		let record = Record {
1340			pk: b"get_existing_1".into(),
1341			partition: 0,
1342			sort_key: Some(SortKey::u32_asc(100)),
1343			data: b"test".to_vec(),
1344		};
1345		storage.put(record).await.expect("put should succeed");
1346
1347		let retrieved = storage
1348			.get(0, b"get_existing_1")
1349			.await
1350			.expect("get should succeed");
1351
1352		assert!(retrieved.is_some());
1353		let retrieved = retrieved.unwrap();
1354		assert_eq!(retrieved.pk, b"get_existing_1");
1355		assert_eq!(retrieved.partition, 0);
1356		assert_eq!(retrieved.data, b"test");
1357
1358		// superset of the key
1359		assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1360		// subset of the key
1361		assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1362
1363		// non-existent key
1364		assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1365	}
1366
1367	/// Tests that get returns updated data after put.
1368	pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1369		let record1 = Record {
1370			pk: b"get_after_update_1".into(),
1371			partition: 0,
1372			sort_key: None,
1373			data: b"version1".to_vec(),
1374		};
1375		storage.put(record1).await.expect("put should succeed");
1376
1377		let record2 = Record {
1378			pk: b"get_after_update_1".into(),
1379			partition: 0,
1380			sort_key: None,
1381			data: b"version2".to_vec(),
1382		};
1383		storage.put(record2).await.expect("put should succeed");
1384
1385		let retrieved = storage
1386			.get(0, b"get_after_update_1")
1387			.await
1388			.expect("get should succeed")
1389			.expect("record should exist");
1390
1391		assert_eq!(retrieved.data, b"version2");
1392	}
1393
1394	/// Tests that delete removes an existing record and returns it.
1395	pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1396		let record = Record {
1397			pk: b"delete_existing_1".into(),
1398			partition: 0,
1399			sort_key: None,
1400			data: b"to delete".to_vec(),
1401		};
1402		storage.put(record.clone()).await.expect("put should succeed");
1403
1404		let deleted_record = storage
1405			.delete(0, b"delete_existing_1")
1406			.await
1407			.expect("delete should succeed");
1408
1409		assert_eq!(deleted_record, Some(record));
1410
1411		let retrieved = storage
1412			.get(0, b"delete_existing_1")
1413			.await
1414			.expect("get should succeed");
1415		assert!(retrieved.is_none(), "record should no longer exist");
1416	}
1417
1418	/// Tests that delete returns None for non-existent records.
1419	pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1420		let deleted_record = storage
1421			.delete(0, b"delete_nonexistent_does_not_exist")
1422			.await
1423			.expect("delete should succeed");
1424
1425		assert!(
1426			deleted_record.is_none(),
1427			"delete should return None for non-existent record"
1428		);
1429	}
1430
1431	/// Tests that delete is idempotent (second delete returns None).
1432	pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1433		let record = Record {
1434			pk: b"delete_idempotent_1".into(),
1435			partition: 0,
1436			sort_key: None,
1437			data: b"delete twice".to_vec(),
1438		};
1439		storage.put(record.clone()).await.expect("put should succeed");
1440
1441		let first_delete = storage
1442			.delete(0, b"delete_idempotent_1")
1443			.await
1444			.expect("first delete should succeed");
1445		let second_delete = storage
1446			.delete(0, b"delete_idempotent_1")
1447			.await
1448			.expect("second delete should succeed");
1449
1450		assert_eq!(first_delete, Some(record), "first delete should return the record");
1451		assert_eq!(second_delete, None, "second delete should return None");
1452	}
1453
1454	/// Tests that query returns empty results for empty partition.
1455	pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1456		clear_partitions(storage, &[0]).await.unwrap();
1457		let results = storage
1458			.query_sorted(Query::new_full_range(0))
1459			.await
1460			.expect("query should succeed");
1461
1462		assert!(results.is_empty());
1463	}
1464
1465	/// Tests that query returns all records in a partition.
1466	pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1467		clear_partitions(storage, &[0]).await.unwrap();
1468		for i in 0..3 {
1469			let record = Record {
1470				pk: format!("query_partition_{}", i).into(),
1471				partition: 0,
1472				sort_key: Some(SortKey::u32_asc(i)),
1473				data: format!("record_{}", i).as_bytes().to_vec(),
1474			};
1475			storage.put(record).await.expect("put should succeed");
1476		}
1477
1478		let results = storage
1479			.query_sorted(Query::new_full_range(0))
1480			.await
1481			.expect("query should succeed");
1482
1483		assert_eq!(results.len(), 3);
1484	}
1485
1486	/// Tests that query returns records in ascending sort key order by default.
1487	pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1488		clear_partitions(storage, &[0]).await.unwrap();
1489		// Insert in non-sequential order
1490		for i in [5, 2, 8, 1, 9] {
1491			let record = Record {
1492				pk: format!("query_asc_{}", i).into(),
1493				partition: 0,
1494				sort_key: Some(SortKey::u32_asc(i)),
1495				data: format!("record_{}", i).as_bytes().to_vec(),
1496			};
1497			storage.put(record).await.expect("put should succeed");
1498		}
1499
1500		let results = storage
1501			.query_sorted(Query::new_full_range(0))
1502			.await
1503			.expect("query should succeed");
1504
1505		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1506		assert_eq!(
1507			values,
1508			vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1509			"should be in ascending order"
1510		);
1511	}
1512
1513	/// Tests that query with limit returns at most N records.
1514	pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1515		clear_partitions(storage, &[0]).await.unwrap();
1516		for i in 0..10 {
1517			let record = Record {
1518				pk: format!("query_limit_{}", i).into(),
1519				partition: 0,
1520				sort_key: Some(SortKey::u32_asc(i)),
1521				data: format!("record_{}", i).as_bytes().to_vec(),
1522			};
1523			storage.put(record).await.expect("put should succeed");
1524		}
1525
1526		let results = storage
1527			.query_sorted(Query::new_full_range(0).limit(3))
1528			.await
1529			.expect("query should succeed");
1530
1531		assert_eq!(results.len(), 3);
1532		let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1533		assert_eq!(
1534			values,
1535			vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1536			"should return first 3 records"
1537		);
1538	}
1539
1540	/// Tests that records without sort keys don't appear in query
1541	pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1542		clear_partitions(storage, &[0]).await.unwrap();
1543		// Records with sort keys
1544		let with_key_1 = Record {
1545			pk: "query_null_with_1".into(),
1546			partition: 0,
1547			sort_key: Some(SortKey::u32_asc(1)),
1548			data: b"with_key_1".to_vec(),
1549		};
1550		let with_key_2 = Record {
1551			pk: "query_null_with_2".into(),
1552			partition: 0,
1553			sort_key: Some(SortKey::u32_asc(2)),
1554			data: b"with_key_2".to_vec(),
1555		};
1556
1557		// Record without sort key
1558		let without_key = Record {
1559			pk: "query_null_without".into(),
1560			partition: 0,
1561			sort_key: None,
1562			data: b"no_key".to_vec(),
1563		};
1564
1565		storage.put(with_key_1).await.expect("put should succeed");
1566		storage.put(without_key).await.expect("put should succeed");
1567		storage.put(with_key_2).await.expect("put should succeed");
1568
1569		// query should only return records with sort keys
1570		let results_query = storage.query_sorted(Query::new_full_range(0)).await
1571			.expect("query should succeed");
1572		assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1573		assert_eq!(results_query[0].data, b"with_key_1");
1574		assert_eq!(results_query[1].data, b"with_key_2");
1575
1576		// get_all should return all records
1577		let results_all = storage.get_all(0).await
1578			.expect("get_all should succeed");
1579		assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1580
1581		// Verify all three records are present (order not guaranteed for get_all)
1582		let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1583		let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1584		let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1585		assert!(has_with_key_1, "get_all should include with_key_1");
1586		assert!(has_with_key_2, "get_all should include with_key_2");
1587		assert!(has_without_key, "get_all should include record without sort key");
1588	}
1589
1590	/// Tests that query only returns records from the specified partition.
1591	pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1592		clear_partitions(storage, &[0, 1]).await.unwrap();
1593		// Records in partition A
1594		for i in 0..3 {
1595			let record = Record {
1596				pk: format!("query_iso_a_{}", i).into(),
1597				partition: 0,
1598				sort_key: Some(SortKey::u32_asc(i)),
1599				data: format!("record_{}", i).as_bytes().to_vec(),
1600			};
1601			storage.put(record).await.expect("put should succeed");
1602		}
1603
1604		// Records in partition B
1605		for i in 0..5 {
1606			let record = Record {
1607				pk: format!("query_iso_b_{}", i).into(),
1608				partition: 1,
1609				sort_key: Some(SortKey::u32_asc(i)),
1610				data: format!("record_{}", i + 100).as_bytes().to_vec(),
1611			};
1612			storage.put(record).await.expect("put should succeed");
1613		}
1614
1615		let results_a = storage
1616			.query_sorted(Query::new_full_range(0))
1617			.await
1618			.expect("query should succeed");
1619
1620		let results_b = storage
1621			.query_sorted(Query::new_full_range(1))
1622			.await
1623			.expect("query should succeed");
1624
1625		assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1626		assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1627
1628		// Verify all results_a are from partition A
1629		assert!(results_a
1630			.iter()
1631			.all(|r| r.partition == 0));
1632
1633		// Verify all results_b are from partition B
1634		assert!(results_b
1635			.iter()
1636			.all(|r| r.partition == 1));
1637	}
1638
1639	/// Tests that query with start and end keys returns only records within the range.
1640	pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1641		clear_partitions(storage, &[0]).await.unwrap();
1642
1643		// Insert records with sort keys 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1644		for i in 1..=10u32 {
1645			let record = Record {
1646				pk: format!("query_range_{}", i).into(),
1647				partition: 0,
1648				sort_key: Some(SortKey::u32_asc(i)),
1649				data: format!("record_{}", i).as_bytes().to_vec(),
1650			};
1651			storage.put(record).await.expect("put should succeed");
1652		}
1653
1654		// Query with start key only (>= 5)
1655		let results_start = storage
1656			.query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1657			.await
1658			.expect("query should succeed");
1659
1660		assert_eq!(results_start.len(), 6, "should return records 5-10");
1661		let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1662		assert_eq!(
1663			values,
1664			vec![
1665				b"record_5".to_vec(),
1666				b"record_6".to_vec(),
1667				b"record_7".to_vec(),
1668				b"record_8".to_vec(),
1669				b"record_9".to_vec(),
1670				b"record_10".to_vec(),
1671			],
1672			"should return records from 5 onwards"
1673		);
1674
1675		// Query with end key only (<= 3)
1676		let results_end = storage
1677			.query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1678			.await
1679			.expect("query should succeed");
1680
1681		assert_eq!(results_end.len(), 3, "should return records 1-3");
1682		let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1683		assert_eq!(
1684			values,
1685			vec![
1686				b"record_1".to_vec(),
1687				b"record_2".to_vec(),
1688				b"record_3".to_vec(),
1689			],
1690			"should return records up to 3"
1691		);
1692
1693		// Query with both start and end keys (3 <= x <= 7)
1694		let results_range = storage
1695			.query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1696			.await
1697			.expect("query should succeed");
1698
1699		assert_eq!(results_range.len(), 5, "should return records 3-7");
1700		let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1701		assert_eq!(
1702			values,
1703			vec![
1704				b"record_3".to_vec(),
1705				b"record_4".to_vec(),
1706				b"record_5".to_vec(),
1707				b"record_6".to_vec(),
1708				b"record_7".to_vec(),
1709			],
1710			"should return records in range 3-7"
1711		);
1712
1713		// Query range with limit
1714		let results_range_limit = storage
1715			.query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1716			.await
1717			.expect("query should succeed");
1718
1719		assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1720		let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1721		assert_eq!(
1722			values,
1723			vec![
1724				b"record_2".to_vec(),
1725				b"record_3".to_vec(),
1726				b"record_4".to_vec(),
1727			],
1728			"should return first 3 records in range"
1729		);
1730
1731		// Query range that matches no records
1732		let results_empty = storage
1733			.query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1734			.await
1735			.expect("query should succeed");
1736
1737		assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1738	}
1739
1740	/// Tests that exclusive end ranges (start..end) exclude the end bound.
1741	pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1742		clear_partitions(storage, &[0]).await.unwrap();
1743
1744		for i in 1..=10u32 {
1745			let record = Record {
1746				pk: format!("query_excl_{}", i).into(),
1747				partition: 0,
1748				sort_key: Some(SortKey::u32_asc(i)),
1749				data: format!("record_{}", i).as_bytes().to_vec(),
1750			};
1751			storage.put(record).await.expect("put should succeed");
1752		}
1753
1754		// Exclusive end: 3 <= x < 7 (should return records 3, 4, 5, 6)
1755		let results = storage
1756			.query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1757			.await
1758			.expect("query should succeed");
1759
1760		assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1761		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1762		assert_eq!(
1763			values,
1764			vec![
1765				b"record_3".to_vec(),
1766				b"record_4".to_vec(),
1767				b"record_5".to_vec(),
1768				b"record_6".to_vec(),
1769			],
1770		);
1771
1772		// Exclusive end only: x < 4 (should return records 1, 2, 3)
1773		let results = storage
1774			.query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1775			.await
1776			.expect("query should succeed");
1777
1778		assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1779		let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1780		assert_eq!(
1781			values,
1782			vec![
1783				b"record_1".to_vec(),
1784				b"record_2".to_vec(),
1785				b"record_3".to_vec(),
1786			],
1787		);
1788	}
1789
1790	/// Tests that full-range query with limit 1 returns the first sorted record.
1791	pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1792		clear_partitions(storage, &[0]).await.unwrap();
1793
1794		for i in [5u32, 2, 8, 1, 9] {
1795			let record = Record {
1796				pk: format!("query_limit1_{}", i).into(),
1797				partition: 0,
1798				sort_key: Some(SortKey::u32_asc(i)),
1799				data: format!("record_{}", i).as_bytes().to_vec(),
1800			};
1801			storage.put(record).await.expect("put should succeed");
1802		}
1803
1804		let results = storage
1805			.query_sorted(Query::new_full_range(0).limit(1))
1806			.await
1807			.expect("query should succeed");
1808
1809		assert_eq!(results.len(), 1);
1810		assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1811	}
1812
1813	/// Tests that incremental_id returns 1 for the first call on a partition.
1814	pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1815		// Clear the LAST_IDS entry for partition 0
1816		storage.delete(LAST_IDS, b"0").await.unwrap();
1817		let id = storage.incremental_id(0).await
1818			.expect("incremental_id should succeed");
1819
1820		assert_eq!(id, 1, "first id should be 1");
1821	}
1822
1823	/// Tests that incremental_id increments on subsequent calls.
1824	pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1825		clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1826
1827		let id1 = storage.incremental_id(0).await
1828			.expect("incremental_id should succeed");
1829		let id2 = storage.incremental_id(0).await
1830			.expect("incremental_id should succeed");
1831		let id3 = storage.incremental_id(0).await
1832			.expect("incremental_id should succeed");
1833
1834		assert_eq!(id1, 1, "first id should be 1");
1835		assert_eq!(id2, 2, "second id should be 2");
1836		assert_eq!(id3, 3, "third id should be 3");
1837	}
1838
1839	/// Tests that incremental_id maintains separate sequences for different partitions.
1840	pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1841		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1842
1843		// Generate IDs for partition A
1844		let a1 = storage.incremental_id(0).await
1845			.expect("incremental_id should succeed");
1846		let a2 = storage.incremental_id(0).await
1847			.expect("incremental_id should succeed");
1848		let a3 = storage.incremental_id(0).await
1849			.expect("incremental_id should succeed");
1850
1851		// Generate IDs for partition B
1852		let b1 = storage.incremental_id(1).await
1853			.expect("incremental_id should succeed");
1854		let b2 = storage.incremental_id(1).await
1855			.expect("incremental_id should succeed");
1856
1857		// Partition A should have 1, 2, 3
1858		assert_eq!(a1, 1);
1859		assert_eq!(a2, 2);
1860		assert_eq!(a3, 3);
1861
1862		// Partition B should have its own sequence starting at 1
1863		assert_eq!(b1, 1);
1864		assert_eq!(b2, 2);
1865
1866		// Generate more for A - should continue from 3
1867		let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1868		assert_eq!(a4, 4);
1869	}
1870
1871	/// Tests that incremental_id persists its state correctly.
1872	pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1873		clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1874
1875		// Generate some IDs
1876		let id1 = storage.incremental_id(0).await
1877			.expect("incremental_id should succeed");
1878		let id2 = storage.incremental_id(0).await
1879			.expect("incremental_id should succeed");
1880		assert_eq!(id1, 1);
1881		assert_eq!(id2, 2);
1882
1883		// Verify the stored value can be retrieved directly
1884		let stored = storage
1885			.get(LAST_IDS, &[0])
1886			.await
1887			.expect("get should succeed")
1888			.expect("id record should exist");
1889		let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1890		assert_eq!(stored_id, 2, "stored id should be 2");
1891
1892		// Continue generating - should pick up where we left off
1893		let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1894		assert_eq!(id3, 3);
1895	}
1896
1897	/// Tests that get_all returns empty results for empty partition.
1898	pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1899		clear_partitions(storage, &[0]).await.unwrap();
1900		let results = storage
1901			.get_all(0)
1902			.await
1903			.expect("get_all should succeed");
1904
1905		assert!(results.is_empty(), "get_all should return empty for empty partition");
1906	}
1907
1908	/// Tests that get_all returns all records in a partition.
1909	pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1910		clear_partitions(storage, &[0]).await.unwrap();
1911
1912		// Insert records with sort keys
1913		for i in 0..5 {
1914			let record = Record {
1915				pk: format!("get_all_{}", i).into(),
1916				partition: 0,
1917				sort_key: Some(SortKey::u32_asc(i)),
1918				data: format!("record_{}", i).as_bytes().to_vec(),
1919			};
1920			storage.put(record).await.expect("put should succeed");
1921		}
1922
1923		let results = storage
1924			.get_all(0)
1925			.await
1926			.expect("get_all should succeed");
1927
1928		assert_eq!(results.len(), 5, "get_all should return all 5 records");
1929
1930		// Verify all records are present (order not guaranteed)
1931		for i in 0..5 {
1932			let expected_data = format!("record_{}", i).as_bytes().to_vec();
1933			let found = results.iter().any(|r| r.data == expected_data);
1934			assert!(found, "get_all should include record_{}", i);
1935		}
1936	}
1937
1938	/// Tests that get_all includes records without sort keys.
1939	pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1940		clear_partitions(storage, &[0]).await.unwrap();
1941
1942		// Insert records with sort keys
1943		let with_key_1 = Record {
1944			pk: "get_all_with_1".into(),
1945			partition: 0,
1946			sort_key: Some(SortKey::u32_asc(1)),
1947			data: b"with_key_1".to_vec(),
1948		};
1949		let with_key_2 = Record {
1950			pk: "get_all_with_2".into(),
1951			partition: 0,
1952			sort_key: Some(SortKey::u32_asc(2)),
1953			data: b"with_key_2".to_vec(),
1954		};
1955
1956		// Insert records without sort keys
1957		let without_key_1 = Record {
1958			pk: "get_all_without_1".into(),
1959			partition: 0,
1960			sort_key: None,
1961			data: b"without_key_1".to_vec(),
1962		};
1963		let without_key_2 = Record {
1964			pk: "get_all_without_2".into(),
1965			partition: 0,
1966			sort_key: None,
1967			data: b"without_key_2".to_vec(),
1968		};
1969
1970		storage.put(with_key_1).await.expect("put should succeed");
1971		storage.put(without_key_1).await.expect("put should succeed");
1972		storage.put(with_key_2).await.expect("put should succeed");
1973		storage.put(without_key_2).await.expect("put should succeed");
1974
1975		let results = storage
1976			.get_all(0)
1977			.await
1978			.expect("get_all should succeed");
1979
1980		assert_eq!(results.len(), 4, "get_all should return all 4 records");
1981
1982		// Verify all records are present
1983		let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
1984		let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
1985		let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
1986		let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
1987
1988		assert!(has_with_1, "get_all should include with_key_1");
1989		assert!(has_with_2, "get_all should include with_key_2");
1990		assert!(has_without_1, "get_all should include without_key_1");
1991		assert!(has_without_2, "get_all should include without_key_2");
1992
1993		// Verify that query excludes records without sort keys
1994		let query_results = storage
1995			.query_sorted(Query::new_full_range(0))
1996			.await
1997			.expect("query should succeed");
1998
1999		assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
2000		let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
2001		assert!(!query_has_without, "query should not include records without sort keys");
2002	}
2003
2004	/// Tests that get_all only returns records from the specified partition.
2005	pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
2006		clear_partitions(storage, &[0, 1]).await.unwrap();
2007
2008		// Insert records in partition 0
2009		for i in 0..3 {
2010			let record = Record {
2011				pk: format!("partition_0_{}", i).into(),
2012				partition: 0,
2013				sort_key: Some(SortKey::u32_asc(i)),
2014				data: format!("p0_record_{}", i).as_bytes().to_vec(),
2015			};
2016			storage.put(record).await.expect("put should succeed");
2017		}
2018
2019		// Insert records in partition 1
2020		for i in 0..2 {
2021			let record = Record {
2022				pk: format!("partition_1_{}", i).into(),
2023				partition: 1,
2024				sort_key: Some(SortKey::u32_asc(i)),
2025				data: format!("p1_record_{}", i).as_bytes().to_vec(),
2026			};
2027			storage.put(record).await.expect("put should succeed");
2028		}
2029
2030		// get_all partition 0
2031		let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2032		assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2033		assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2034
2035		// get_all partition 1
2036		let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2037		assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2038		assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2039	}
2040
2041	/// Tests that get_all reflects deletions.
2042	pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2043		clear_partitions(storage, &[0]).await.unwrap();
2044
2045		for i in 0..3u32 {
2046			let record = Record {
2047				pk: format!("get_all_del_{}", i).into(),
2048				partition: 0,
2049				sort_key: Some(SortKey::u32_asc(i)),
2050				data: format!("record_{}", i).as_bytes().to_vec(),
2051			};
2052			storage.put(record).await.expect("put should succeed");
2053		}
2054
2055		storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2056
2057		let results = storage.get_all(0).await.expect("get_all should succeed");
2058		assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2059
2060		let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2061		assert!(!has_deleted, "deleted record should not appear");
2062	}
2063}