1mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::{SortKey, SortKeyBuilder};
45
46#[cfg(feature = "indexed-db")]
47pub mod indexed_db;
48
49use std::ops::RangeBounds;
50
51use anyhow::Context;
52use bitcoin::{Amount, Transaction, Txid};
53use bitcoin::secp256k1::PublicKey;
54use bitcoin::hashes::Hash;
55#[cfg(feature = "onchain-bdk")]
56use bdk_core::Merge;
57#[cfg(feature = "onchain-bdk")]
58use bdk_wallet::ChangeSet;
59use chrono::{DateTime, Local};
60use lightning_invoice::Bolt11Invoice;
61use serde::{de::DeserializeOwned, Serialize};
62
63use ark::lightning::{PaymentHash, Preimage};
64use ark::{Vtxo, VtxoId};
65use ark::vtxo::Full;
66use bitcoin_ext::BlockDelta;
67
68use crate::actions::{WalletActionCheckpoint, WalletActionId};
69use crate::exit::ExitTxOrigin;
70use crate::movement::{
71 Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
72};
73use crate::persist::BarkPersister;
74use crate::persist::models::{
75 LightningReceive, PaidInvoice, PendingBoard, PendingOffboard,
76 RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
77 StoredRoundState, Unlocked, wallet_vtxo_from_full,
78};
79use crate::round::RoundState;
80use crate::vtxo::{VtxoState, VtxoStateKind};
81use crate::{WalletProperties, WalletVtxo};
82
83
84pub mod partition {
85 pub const PROPERTIES: u8 = 0;
86 #[allow(unused)]
87 pub const BDK_CHANGESET: u8 = 1;
88 pub const VTXO: u8 = 2;
89 pub const PUBLIC_KEY: u8 = 3;
90 pub const PENDING_BOARD: u8 = 4;
91 pub const ROUND_STATE: u8 = 5;
92 pub const MOVEMENT: u8 = 6;
93 #[allow(unused)]
95 pub const LEGACY_LIGHTNING_SEND: u8 = 7;
96 pub const LIGHTNING_RECEIVE: u8 = 8;
97 pub const EXIT_VTXO: u8 = 9;
98 pub const EXIT_CHILD_TX: u8 = 10;
99 pub const MAILBOX_CHECKPOINT: u8 = 11;
100 pub const PENDING_OFFBOARD: u8 = 12;
101 pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
103 pub const WALLET_ACTION_CHECKPOINT: u8 = 14;
105 pub const PAID_INVOICE: u8 = 15;
108
109 pub const LAST_IDS: u8 = u8::MAX;
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub struct Record {
115 pub partition: u8,
119
120 pub pk: Vec<u8>,
122
123 pub sort_key: Option<SortKey>,
132
133 pub data: Vec<u8>,
135}
136
137impl Record {
138 fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
140 serde_json::from_slice(&self.data).map_err(Into::into)
141 }
142
143 fn from_data<T: Serialize>(
145 partition: u8,
146 pk: &[u8],
147 sort_key: Option<SortKey>,
148 data: &T,
149 ) -> anyhow::Result<Record> {
150 Ok(Record {
151 partition,
152 pk: pk.to_vec(),
153 sort_key,
154 data: serde_json::to_vec(data)?,
155 })
156 }
157}
158
159pub trait QueryRange: RangeBounds<SortKey> + Send {}
161
162impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
163
164#[derive(Debug, Clone)]
166pub struct Query<R: QueryRange> {
167 pub partition: u8,
169
170 pub range: R,
172
173 pub limit: Option<usize>,
175}
176
177impl<R: QueryRange> Query<R> {
178 pub fn new(partition: u8, range: R) -> Self {
180 Self {
181 partition,
182 range,
183 limit: None,
184 }
185 }
186
187 pub fn limit(mut self, limit: usize) -> Self {
192 self.limit = Some(limit);
193 self
194 }
195}
196
197impl Query<std::ops::RangeFull> {
198 pub fn new_full_range(partition: u8) -> Self {
199 Self::new(partition, ..)
200 }
201}
202
203fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
204 let body = pm.value_string();
205
206 let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
207 buf.extend(pm.type_str().as_bytes().iter().copied());
208 buf.push(0xfe);
211 buf.extend(body.into_bytes());
212 buf
213}
214
215#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
262#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
263pub trait StorageAdaptor: Send + Sync + 'static {
264 async fn put(&mut self, record: Record) -> anyhow::Result<()>;
266
267 async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
271
272 async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
276
277 async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
283 -> anyhow::Result<Vec<Record>>;
284
285 async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
289
290 async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
292 let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
293 .map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
294 let next_partition_id = last_partition_id + 1;
295
296 let record = Record::from_data(
297 partition::LAST_IDS,
298 &[partition],
299 None,
300 &next_partition_id,
301 )?;
302
303 self.put(record).await?;
304 Ok(next_partition_id)
305 }
306}
307
308async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
309 match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
310 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
311 None => Ok(None),
312 }
313}
314
315async fn get_check_vtxo_state<S: StorageAdaptor>(
316 adaptor: &S,
317 vtxo_id: VtxoId,
318 allowed_states: &[VtxoStateKind],
319) -> anyhow::Result<SerdeVtxo> {
320 let vtxo = get_vtxo(adaptor, vtxo_id).await?
321 .context("vtxo not found")?;
322
323 let current_state = vtxo.current_state().context("vtxo has no state")?;
324 if !allowed_states.contains(¤t_state.kind()) {
325 bail!("current state {:?} not in allowed states {:?}",
326 current_state.kind(), allowed_states
327 );
328 }
329
330 Ok(vtxo)
331}
332
333async fn update_vtxo_state_checked<S: StorageAdaptor>(
334 adaptor: &mut S,
335 vtxo_id: VtxoId,
336 new_state: VtxoState,
337 allowed_old_states: &[VtxoStateKind],
338) -> anyhow::Result<WalletVtxo> {
339 let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
340
341 let sk = sort::vtxo_sort_key(
342 new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
343 );
344
345 serde_vtxo.states.push(new_state.clone());
346 let updated_record = Record::from_data(
347 partition::VTXO,
348 &vtxo_id.to_bytes(),
349 Some(sk),
350 &serde_vtxo,
351 )?;
352
353 adaptor.put(updated_record).await?;
354
355 Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, new_state))
356}
357
358pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
359 inner: tokio::sync::RwLock<S>,
360}
361
362impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
363 pub fn new(inner: S) -> Self {
364 Self {
365 inner: tokio::sync::RwLock::new(inner),
366 }
367 }
368}
369
370#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
372#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
373impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
374 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
375 let record = Record::from_data(
376 partition::PROPERTIES,
377 &[],
379 None,
380 properties,
381 )?;
382 self.inner.write().await.put(record).await
383 }
384
385 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
386 match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
387 Some(record) => Ok(Some(record.to_data()?)),
388 None => Ok(None),
389 }
390 }
391
392 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
393 let mut properties = match self.read_properties().await? {
394 Some(properties) => properties,
395 None => bail!("wallet not initialized"),
396 };
397
398 properties.server_pubkey = Some(server_pubkey);
399
400 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
401 self.inner.write().await.put(record).await
402 }
403
404 async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
405 let mut properties = match self.read_properties().await? {
406 Some(properties) => properties,
407 None => bail!("wallet not initialized"),
408 };
409
410 properties.server_mailbox_pubkey = Some(server_mailbox_pubkey);
411
412 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
413 self.inner.write().await.put(record).await
414 }
415
416 #[cfg(feature = "onchain-bdk")]
417 async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
418 match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
419 Some(record) => record.to_data(),
420 None => Ok(ChangeSet::default()),
421 }
422 }
423
424 #[cfg(feature = "onchain-bdk")]
425 async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
426 let mut current = self.initialize_bdk_wallet().await?;
427 current.merge(changeset.clone());
428
429 let record = Record::from_data(
430 partition::BDK_CHANGESET,
431 &[],
433 None,
434 ¤t,
435 )?;
436 self.inner.write().await.put(record).await
437 }
438
439 async fn create_new_movement(
440 &self,
441 status: MovementStatus,
442 subsystem: &MovementSubsystem,
443 time: DateTime<Local>,
444 ) -> anyhow::Result<MovementId> {
445 let mut lock = self.inner.write().await;
446
447 let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
448 let movement = Movement::new(id, status, subsystem, time);
449
450 let record = Record::from_data(
451 partition::MOVEMENT,
452 &id.to_bytes(),
453 Some(sort::movement_sort_key(&time)),
454 &movement,
455 )?;
456 lock.put(record).await?;
457
458 Ok(id)
459 }
460
461 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
462 let mut guard = self.inner.write().await;
463
464 let record = Record::from_data(
465 partition::MOVEMENT,
466 &movement.id.to_bytes(),
467 Some(sort::movement_sort_key(&movement.time.created_at)),
468 movement,
469 )?;
470 guard.put(record).await?;
471
472 let sent = movement.sent_to.iter().map(|d| &d.destination);
474 let rcvd = movement.received_on.iter().map(|d| &d.destination);
475 for pm in sent.chain(rcvd) {
476 let pm_bytes = serialize_payment_method(pm);
477 let primary_key = {
478 let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
480 buf.extend(pm_bytes.iter().copied());
481 buf.extend(movement.id.to_bytes());
482 buf
483 };
484 let record = Record::from_data(
485 partition::MOVEMENT_PAYMENT_METHOD,
486 &primary_key,
487 Some(SortKey::from_bytes(pm_bytes)),
488 &movement.id.0,
489 )?;
490 guard.put(record).await?;
491 }
492
493 Ok(())
494 }
495
496 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
497 self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
498 .await?
499 .context("movement not found")?
500 .to_data()
501 }
502
503 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
504 let records = self.inner.read().await
505 .query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
506 records.into_iter().map(|r| r.to_data()).collect()
507 }
508
509 async fn get_movements_by_payment_method(
510 &self,
511 payment_method: &PaymentMethod,
512 ) -> anyhow::Result<Vec<Movement>> {
513 let pm_bytes = serialize_payment_method(payment_method);
514 let sort_key = SortKey::from_bytes(pm_bytes);
515
516 let guard = self.inner.read().await;
517 let idx_recs = guard.query_sorted(Query::new(
518 partition::MOVEMENT_PAYMENT_METHOD,
519 sort_key.clone()..=sort_key,
520 )).await?;
521
522 let mut ret = Vec::with_capacity(idx_recs.len());
523 for idx_rec in idx_recs {
524 let id = MovementId::new(idx_rec.to_data::<u32>()
525 .context("corrupt db: movement payment method index value")?);
526
527 ret.push(
528 guard.get(partition::MOVEMENT, &id.to_bytes()).await?
529 .context("corrupt db: movement payment method entry for unknown movement")?
530 .to_data()
531 .context("corrupt db: invalid movement record")?
532 );
533 }
534 Ok(ret)
535 }
536
537 async fn store_pending_board(
538 &self,
539 vtxo: &Vtxo<Full>,
540 funding_tx: &Transaction,
541 movement_id: MovementId,
542 ) -> anyhow::Result<()> {
543 let pending_board = PendingBoard {
544 vtxos: vec![vtxo.id()],
545 amount: vtxo.amount(),
546 funding_tx: funding_tx.clone(),
547 movement_id,
548 };
549
550 let record = Record::from_data(
551 partition::PENDING_BOARD,
552 &vtxo.id().to_bytes(),
553 None,
554 &pending_board,
555 )?;
556
557 self.inner.write().await.put(record).await
558 }
559
560 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
561 self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
562 Ok(())
563 }
564
565 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
566 let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
567 records
568 .into_iter()
569 .map(|r| {
570 let board = r.to_data::<PendingBoard>()?;
571 Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
572 })
573 .collect()
574 }
575
576 async fn get_pending_board_by_vtxo_id(
577 &self,
578 vtxo_id: VtxoId,
579 ) -> anyhow::Result<Option<PendingBoard>> {
580 match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
581 Some(record) => Ok(Some(record.to_data()?)),
582 None => Ok(None),
583 }
584 }
585
586 async fn store_round_state(&self, round_state: &RoundState) -> anyhow::Result<RoundStateId> {
587 let mut lock = self.inner.write().await;
588
589 let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
590 let serde_state = SerdeRoundState::from(round_state);
591 let record = Record::from_data(
592 partition::ROUND_STATE,
593 &id.to_bytes(),
594 Some(sort::SortKey::u32_asc(id.0)),
595 &serde_state,
596 )?;
597 lock.put(record).await?;
598
599 Ok(id)
600 }
601
602 async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
603 let serde_state = SerdeRoundState::from(round_state.state());
604 let record = Record::from_data(
605 partition::ROUND_STATE,
606 &round_state.id().to_bytes(),
607 Some(sort::SortKey::u32_asc(round_state.id().0)),
608 &serde_state,
609 )?;
610 self.inner.write().await.put(record).await
611 }
612
613 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
614 self.inner.write().await
615 .delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
616 Ok(())
617 }
618
619 async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
620 let record = self.inner.read().await
621 .get(partition::ROUND_STATE, &_id.to_bytes()).await?;
622 match record {
623 Some(r) => {
624 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
625 let id = RoundStateId(u32::from_be_bytes(pk_slice));
626 let state = r.to_data::<SerdeRoundState>()?.into();
627 Ok(Some(StoredRoundState::new(id, state)))
628 },
629 None => Ok(None),
630 }
631 }
632
633 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
634 let records = self.inner.read().await
635 .get_all(partition::ROUND_STATE).await?;
636 records.into_iter()
637 .map(|r| {
638 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
639 Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
640 })
641 .collect()
642 }
643
644 async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
645 let mut lock = self.inner.write().await;
646
647 for (vtxo, state) in vtxos {
648 let serde_vtxo = SerdeVtxo {
649 vtxo: (*vtxo).clone(),
650 states: vec![(*state).clone()],
651 };
652
653 let sk = sort::vtxo_sort_key(
654 state.kind(), vtxo.expiry_height(), vtxo.amount(),
655 );
656 let record = Record::from_data(
657 partition::VTXO,
658 &vtxo.id().to_bytes(),
659 Some(sk),
660 &serde_vtxo,
661 )?;
662 lock.put(record).await?;
663 }
664 Ok(())
665 }
666
667 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
668 let lock = self.inner.read().await;
669 match get_vtxo(&*lock, id).await? {
670 Some(serde_vtxo) => {
671 let state = serde_vtxo.current_state()
672 .context("vtxo has no state")?.clone();
673 Ok(Some(wallet_vtxo_from_full(&serde_vtxo.vtxo, state)))
674 },
675 None => Ok(None),
676 }
677 }
678
679 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
680 let records = self.inner.read().await
681 .query_sorted(Query::new_full_range(partition::VTXO)).await?;
682
683 records
684 .into_iter()
685 .map(|r| {
686 let serde_vtxo = r.to_data::<SerdeVtxo>()?;
687 let state = serde_vtxo
688 .current_state()
689 .cloned()
690 .context("vtxo has no state")?;
691 Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, state))
692 })
693 .collect()
694 }
695
696 async fn get_vtxos_by_state(
697 &self,
698 states: &[VtxoStateKind],
699 ) -> anyhow::Result<Vec<WalletVtxo>> {
700 let lock = self.inner.read().await;
701
702 let range = |state: VtxoStateKind| {
703 let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
704 let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
705 (start, end)
706 };
707
708 let mut records = Vec::new();
709 for state in states {
710 let (start, end) = range(*state);
711 let query = Query::new(partition::VTXO, start..=end);
712
713 for record in lock.query_sorted(query).await? {
714 let serde_vtxo = record.to_data::<SerdeVtxo>()?;
715 let current_state = serde_vtxo.current_state()
716 .context("vtxo has no current state")?.clone();
717 debug_assert_eq!(current_state.kind(), *state);
718 records.push(wallet_vtxo_from_full(&serde_vtxo.vtxo, current_state));
719 }
720 }
721
722 Ok(records)
723 }
724
725 async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
726 let lock = self.inner.read().await;
727 Ok(get_vtxo(&*lock, id).await?.map(|s| s.vtxo))
728 }
729
730 async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
731 let lock = self.inner.read().await;
732 let mut out = Vec::with_capacity(ids.len());
733 for id in ids {
734 let serde_vtxo = get_vtxo(&*lock, *id).await?
735 .with_context(|| format!("vtxo {id} not found"))?;
736 out.push(serde_vtxo.vtxo);
737 }
738 Ok(out)
739 }
740
741 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
742 match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
743 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
744 None => Ok(None),
745 }
746 }
747
748 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
749 match self.get_wallet_vtxo(id).await? {
750 Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
751 None => Ok(false),
752 }
753 }
754
755 async fn update_vtxo_state_checked(
756 &self,
757 vtxo_id: VtxoId,
758 new_state: VtxoState,
759 allowed_old_states: &[VtxoStateKind],
760 ) -> anyhow::Result<WalletVtxo> {
761 let mut lock = self.inner.write().await;
762 update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
763 }
764
765 async fn update_vtxo_states_checked(
766 &self,
767 vtxo_ids: &[VtxoId],
768 new_state: VtxoState,
769 allowed_old_states: &[VtxoStateKind],
770 ) -> anyhow::Result<()> {
771 let mut lock = self.inner.write().await;
772 for id in vtxo_ids {
778 get_check_vtxo_state(&*lock, *id, allowed_old_states).await?;
779 }
780 for id in vtxo_ids {
781 update_vtxo_state_checked(&mut *lock, *id, new_state.clone(), allowed_old_states).await?;
782 }
783 Ok(())
784 }
785
786 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
787 let vtxo_key = SerdeVtxoKey { index, public_key };
788 let record = Record::from_data(
789 partition::PUBLIC_KEY,
790 &public_key.serialize()[..],
791 Some(sort::SortKey::u64_desc(index as u64)),
792 &vtxo_key,
793 )?;
794 self.inner.write().await.put(record).await
795 }
796
797 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
798 let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
800 let records = self.inner.read().await.query_sorted(query).await?;
801
802 match records.into_iter().next() {
803 Some(record) => {
804 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
805 Ok(Some(vtxo_key.index))
806 }
807 None => Ok(None),
808 }
809 }
810
811 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
812 match self.inner.read().await
813 .get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
814 {
815 Some(record) => {
816 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
817 Ok(Some(vtxo_key.index))
818 }
819 None => Ok(None),
820 }
821 }
822
823 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
824 match self.inner.read().await
825 .get(partition::MAILBOX_CHECKPOINT, &[]).await?
826 {
827 Some(record) => Ok(record.to_data::<u64>()?),
828 None => Ok(0),
829 }
830 }
831
832 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
833 let mut lock = self.inner.write().await;
834 let record = Record::from_data(
835 partition::MAILBOX_CHECKPOINT,
836 &[],
837 None,
838 &checkpoint,
839 )?;
840 lock.put(record).await?;
841 Ok(())
842 }
843
844 async fn upsert_wallet_action_checkpoint(
845 &self,
846 id: &WalletActionId,
847 checkpoint: &WalletActionCheckpoint,
848 ) -> anyhow::Result<()> {
849 let record = Record::from_data(
850 partition::WALLET_ACTION_CHECKPOINT,
851 id.as_bytes(),
852 None,
853 checkpoint,
854 )?;
855 self.inner.write().await.put(record).await
856 }
857
858 async fn get_wallet_action_checkpoint(
859 &self,
860 id: &WalletActionId,
861 ) -> anyhow::Result<Option<WalletActionCheckpoint>> {
862 match self.inner.read().await
863 .get(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?
864 {
865 Some(record) => Ok(Some(record.to_data()?)),
866 None => Ok(None),
867 }
868 }
869
870 async fn get_all_wallet_action_checkpoints(
871 &self,
872 ) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
873 let records = self.inner.read().await
874 .get_all(partition::WALLET_ACTION_CHECKPOINT).await?;
875 records.into_iter().map(|r| r.to_data()).collect()
876 }
877
878 async fn remove_wallet_action_checkpoint(
879 &self,
880 id: &WalletActionId,
881 ) -> anyhow::Result<()> {
882 self.inner.write().await
883 .delete(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?;
884 Ok(())
885 }
886
887 async fn record_paid_invoice(
888 &self,
889 payment_hash: PaymentHash,
890 preimage: Preimage,
891 ) -> anyhow::Result<()> {
892 let key = payment_hash.to_byte_array();
893 let mut lock = self.inner.write().await;
895 if lock.get(partition::PAID_INVOICE, &key).await?.is_some() {
896 return Ok(());
897 }
898 let paid = PaidInvoice {
899 payment_hash,
900 preimage,
901 paid_at: chrono::Local::now(),
902 };
903 let record = Record::from_data(partition::PAID_INVOICE, &key, None, &paid)?;
904 lock.put(record).await
905 }
906
907 async fn get_paid_invoice(
908 &self,
909 payment_hash: PaymentHash,
910 ) -> anyhow::Result<Option<PaidInvoice>> {
911 match self.inner.read().await
912 .get(partition::PAID_INVOICE, &payment_hash.to_byte_array()).await?
913 {
914 Some(record) => Ok(Some(record.to_data()?)),
915 None => Ok(None),
916 }
917 }
918
919 async fn store_lightning_receive(
920 &self,
921 payment_hash: PaymentHash,
922 preimage: Preimage,
923 invoice: &Bolt11Invoice,
924 htlc_recv_cltv_delta: BlockDelta,
925 ) -> anyhow::Result<()> {
926 let lightning_receive = LightningReceive {
927 payment_hash,
928 payment_preimage: preimage,
929 invoice: invoice.clone(),
930 htlc_recv_cltv_delta,
931 htlc_vtxos: vec![],
932 movement_id: None,
933 finished_at: None,
934 preimage_revealed_at: None,
935 };
936
937 let record = Record::from_data(
938 partition::LIGHTNING_RECEIVE,
939 &payment_hash.to_byte_array(),
940 None,
941 &lightning_receive,
942 )?;
943 self.inner.write().await.put(record).await
944 }
945
946 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
947 let records = self.inner.read().await
948 .get_all(partition::LIGHTNING_RECEIVE).await?;
949 records
950 .into_iter()
951 .filter_map(|r| {
952 let receive = r.to_data::<LightningReceive>().ok()?;
953 if receive.finished_at.is_none() {
954 Some(Ok(receive))
955 } else {
956 None
957 }
958 })
959 .collect()
960 }
961
962 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
963 let mut lock = self.inner.write().await;
964
965 let pk = payment_hash.to_byte_array();
966 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
967 .context("lightning receive not found")?;
968 let mut lightning_receive: LightningReceive = record.to_data()?;
969
970 lightning_receive.preimage_revealed_at = Some(Local::now());
971
972 let updated_record = Record::from_data(
973 partition::LIGHTNING_RECEIVE,
974 &pk,
975 None,
976 &lightning_receive,
977 )?;
978 lock.put(updated_record).await
979 }
980
981 async fn update_lightning_receive(
982 &self,
983 payment_hash: PaymentHash,
984 vtxo_ids: &[VtxoId],
985 movement_id: MovementId,
986 ) -> anyhow::Result<()> {
987 let mut lock = self.inner.write().await;
988 let pk = payment_hash.to_byte_array();
989 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
990 .context("lightning receive not found")?;
991 let mut lightning_receive: LightningReceive = record.to_data()?;
992
993 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
994 for vtxo_id in vtxo_ids {
995 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
996 .context("vtxo not found")?;
997 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
998 }
999
1000 lightning_receive.htlc_vtxos = htlc_vtxos;
1001 lightning_receive.movement_id = Some(movement_id);
1002
1003 let updated_record = Record::from_data(
1004 partition::LIGHTNING_RECEIVE,
1005 &pk,
1006 None,
1007 &lightning_receive,
1008 )?;
1009 lock.put(updated_record).await
1010 }
1011
1012 async fn fetch_lightning_receive_by_payment_hash(
1013 &self,
1014 payment_hash: PaymentHash,
1015 ) -> anyhow::Result<Option<LightningReceive>> {
1016 match self.inner.read().await
1017 .get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1018 {
1019 Some(record) => Ok(Some(record.to_data()?)),
1020 None => Ok(None),
1021 }
1022 }
1023
1024 async fn finish_pending_lightning_receive(
1025 &self,
1026 payment_hash: PaymentHash,
1027 ) -> anyhow::Result<()> {
1028 let mut lock = self.inner.write().await;
1029 let pk = payment_hash.to_byte_array();
1030 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1031 .context("lightning receive not found")?;
1032 let mut lightning_receive: LightningReceive = record.to_data()?;
1033
1034 lightning_receive.finished_at = Some(Local::now());
1035
1036 let updated_record = Record::from_data(
1037 partition::LIGHTNING_RECEIVE,
1038 &pk,
1039 None,
1040 &lightning_receive,
1041 )?;
1042 lock.put(updated_record).await
1043 }
1044
1045 async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1046 let record = Record::from_data(
1047 partition::PENDING_OFFBOARD,
1048 &pending.movement_id.to_bytes(),
1049 None,
1050 pending,
1051 )?;
1052 self.inner.write().await.put(record).await
1053 }
1054
1055 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1056 let records = self.inner.read().await
1057 .get_all(partition::PENDING_OFFBOARD).await?;
1058 records.into_iter().map(|r| r.to_data()).collect()
1059 }
1060
1061 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1062 self.inner.write().await
1063 .delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1064 Ok(())
1065 }
1066
1067 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1068 let record = Record::from_data(
1069 partition::EXIT_VTXO,
1070 &exit.vtxo_id.to_bytes(),
1071 None,
1072 exit,
1073 )?;
1074 self.inner.write().await.put(record).await
1075 }
1076
1077 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1078 self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1079 Ok(())
1080 }
1081
1082 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1083 let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1084 records.into_iter().map(|r| r.to_data()).collect()
1085 }
1086
1087 async fn store_exit_child_tx(
1088 &self,
1089 exit_txid: Txid,
1090 child_tx: &Transaction,
1091 origin: ExitTxOrigin,
1092 ) -> anyhow::Result<()> {
1093 let exit_child = SerdeExitChildTx {
1094 child_tx: child_tx.clone(),
1095 origin,
1096 };
1097 let record = Record::from_data(
1098 partition::EXIT_CHILD_TX,
1099 &exit_txid.to_byte_array(),
1100 None,
1101 &exit_child,
1102 )?;
1103 self.inner.write().await.put(record).await
1104 }
1105
1106 async fn get_exit_child_tx(
1107 &self,
1108 exit_txid: Txid,
1109 ) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1110 match self.inner.read().await
1111 .get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1112 {
1113 Some(record) => {
1114 let exit_child = record.to_data::<SerdeExitChildTx>()?;
1115 Ok(Some((exit_child.child_tx, exit_child.origin)))
1116 }
1117 None => Ok(None),
1118 }
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use super::*;
1125
1126 #[test]
1127 fn storage_query_builder() {
1128 let query = Query::new_full_range(0).limit(10);
1129
1130 assert_eq!(query.partition, 0);
1131 assert_eq!(query.limit, Some(10));
1132 assert_eq!(query.range, ..);
1133 }
1134}
1135
1136#[cfg(test)]
1151pub mod test_suite {
1152 use super::*;
1153 use super::partition::LAST_IDS;
1154 use super::sort::SortKey;
1155
1156 async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1157 for partition in partitions {
1158 let records = storage.get_all(*partition).await?;
1159 for record in records {
1160 storage.delete(record.partition, &record.pk).await?;
1161 }
1162 }
1163 Ok(())
1164 }
1165
1166 pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1168 test_put_insert(storage).await;
1170 test_put_upsert(storage).await;
1171 test_put_with_sort_key(storage).await;
1172 test_put_without_sort_key(storage).await;
1173 test_put_multiple_partitions(storage).await;
1174
1175 test_get_existing(storage).await;
1177 test_get_after_update(storage).await;
1178
1179 test_delete_existing(storage).await;
1181 test_delete_nonexistent(storage).await;
1182 test_delete_idempotent(storage).await;
1183
1184 test_query_empty_partition(storage).await;
1186 test_query_returns_partition_records(storage).await;
1187 test_query_ordering(storage).await;
1188 test_query_with_limit(storage).await;
1189 test_query_null_sort_key_excluded(storage).await;
1190 test_query_partition_isolation(storage).await;
1191 test_query_range(storage).await;
1192 test_query_exclusive_end_range(storage).await;
1193 test_query_full_range_limit_one(storage).await;
1194
1195 test_get_all_empty_partition(storage).await;
1197 test_get_all_returns_all_records(storage).await;
1198 test_get_all_includes_records_without_sort_key(storage).await;
1199 test_get_all_partition_isolation(storage).await;
1200 test_get_all_after_delete(storage).await;
1201
1202 test_incremental_id_starts_at_one(storage).await;
1204 test_incremental_id_increments(storage).await;
1205 test_incremental_id_partition_isolation(storage).await;
1206 test_incremental_id_persists_across_operations(storage).await;
1207 }
1208
1209 pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1211 let record = Record {
1212 pk: "put_insert_1".into(),
1213 partition: 0,
1214 sort_key: None,
1215 data: b"test data".to_vec(),
1216 };
1217
1218 storage.put(record).await.expect("put should succeed");
1219
1220 let retrieved = storage
1221 .get(0, b"put_insert_1")
1222 .await
1223 .expect("get should succeed")
1224 .expect("record should exist");
1225
1226 assert_eq!(retrieved.pk, b"put_insert_1");
1227 assert_eq!(retrieved.partition, 0);
1228 assert_eq!(retrieved.data, b"test data");
1229 }
1230
1231 pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1233 let record1 = Record {
1234 pk: b"put_upsert_1".into(),
1235 partition: 0,
1236 sort_key: None,
1237 data: b"original".to_vec(),
1238 };
1239 storage.put(record1).await.expect("first put should succeed");
1240
1241 let record2 = Record {
1242 pk: "put_upsert_1".into(),
1243 partition: 0,
1244 sort_key: None,
1245 data: b"updated".to_vec(),
1246 };
1247 storage
1248 .put(record2)
1249 .await
1250 .expect("second put should succeed");
1251
1252 let retrieved = storage
1253 .get(0, b"put_upsert_1")
1254 .await
1255 .expect("get should succeed")
1256 .expect("record should exist");
1257
1258 assert_eq!(retrieved.data, b"updated", "data should be updated");
1259 }
1260
1261 pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1263 let sort_key = SortKey::u32_asc(42);
1264 let record = Record {
1265 pk: b"put_sort_key_1".into(),
1266 partition: 0,
1267 sort_key: Some(sort_key.clone()),
1268 data: b"with sort key".to_vec(),
1269 };
1270
1271 storage.put(record).await.expect("put should succeed");
1272
1273 let retrieved = storage
1274 .get(0, b"put_sort_key_1")
1275 .await
1276 .expect("get should succeed")
1277 .expect("record should exist");
1278
1279 assert_eq!(retrieved.sort_key, Some(sort_key));
1280 }
1281
1282 pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1284 let record = Record {
1285 pk: b"put_no_sort_key_1".into(),
1286 partition: 0,
1287 sort_key: None,
1288 data: b"no sort key".to_vec(),
1289 };
1290
1291 storage.put(record).await.expect("put should succeed");
1292
1293 let retrieved = storage
1294 .get(0, b"put_no_sort_key_1")
1295 .await
1296 .expect("get should succeed")
1297 .expect("record should exist");
1298
1299 assert!(retrieved.sort_key.is_none());
1300 }
1301
1302 pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1304 let record_a = Record {
1305 pk: "put_multi_a".into(),
1306 partition: 0,
1307 sort_key: None,
1308 data: b"in partition a".to_vec(),
1309 };
1310 let record_b = Record {
1311 pk: "put_multi_b".into(),
1312 partition: 1,
1313 sort_key: None,
1314 data: b"in partition b".to_vec(),
1315 };
1316
1317 storage.put(record_a).await.expect("put a should succeed");
1318 storage.put(record_b).await.expect("put b should succeed");
1319
1320 let retrieved_a = storage
1321 .get(0, b"put_multi_a")
1322 .await
1323 .expect("get should succeed")
1324 .expect("record a should exist");
1325 let retrieved_b = storage
1326 .get(1, b"put_multi_b")
1327 .await
1328 .expect("get should succeed")
1329 .expect("record b should exist");
1330
1331 assert_eq!(retrieved_a.partition, 0);
1332 assert_eq!(retrieved_b.partition, 1);
1333 }
1334
1335 pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1337 let record = Record {
1338 pk: b"get_existing_1".into(),
1339 partition: 0,
1340 sort_key: Some(SortKey::u32_asc(100)),
1341 data: b"test".to_vec(),
1342 };
1343 storage.put(record).await.expect("put should succeed");
1344
1345 let retrieved = storage
1346 .get(0, b"get_existing_1")
1347 .await
1348 .expect("get should succeed");
1349
1350 assert!(retrieved.is_some());
1351 let retrieved = retrieved.unwrap();
1352 assert_eq!(retrieved.pk, b"get_existing_1");
1353 assert_eq!(retrieved.partition, 0);
1354 assert_eq!(retrieved.data, b"test");
1355
1356 assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1358 assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1360
1361 assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1363 }
1364
1365 pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1367 let record1 = Record {
1368 pk: b"get_after_update_1".into(),
1369 partition: 0,
1370 sort_key: None,
1371 data: b"version1".to_vec(),
1372 };
1373 storage.put(record1).await.expect("put should succeed");
1374
1375 let record2 = Record {
1376 pk: b"get_after_update_1".into(),
1377 partition: 0,
1378 sort_key: None,
1379 data: b"version2".to_vec(),
1380 };
1381 storage.put(record2).await.expect("put should succeed");
1382
1383 let retrieved = storage
1384 .get(0, b"get_after_update_1")
1385 .await
1386 .expect("get should succeed")
1387 .expect("record should exist");
1388
1389 assert_eq!(retrieved.data, b"version2");
1390 }
1391
1392 pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1394 let record = Record {
1395 pk: b"delete_existing_1".into(),
1396 partition: 0,
1397 sort_key: None,
1398 data: b"to delete".to_vec(),
1399 };
1400 storage.put(record.clone()).await.expect("put should succeed");
1401
1402 let deleted_record = storage
1403 .delete(0, b"delete_existing_1")
1404 .await
1405 .expect("delete should succeed");
1406
1407 assert_eq!(deleted_record, Some(record));
1408
1409 let retrieved = storage
1410 .get(0, b"delete_existing_1")
1411 .await
1412 .expect("get should succeed");
1413 assert!(retrieved.is_none(), "record should no longer exist");
1414 }
1415
1416 pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1418 let deleted_record = storage
1419 .delete(0, b"delete_nonexistent_does_not_exist")
1420 .await
1421 .expect("delete should succeed");
1422
1423 assert!(
1424 deleted_record.is_none(),
1425 "delete should return None for non-existent record"
1426 );
1427 }
1428
1429 pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1431 let record = Record {
1432 pk: b"delete_idempotent_1".into(),
1433 partition: 0,
1434 sort_key: None,
1435 data: b"delete twice".to_vec(),
1436 };
1437 storage.put(record.clone()).await.expect("put should succeed");
1438
1439 let first_delete = storage
1440 .delete(0, b"delete_idempotent_1")
1441 .await
1442 .expect("first delete should succeed");
1443 let second_delete = storage
1444 .delete(0, b"delete_idempotent_1")
1445 .await
1446 .expect("second delete should succeed");
1447
1448 assert_eq!(first_delete, Some(record), "first delete should return the record");
1449 assert_eq!(second_delete, None, "second delete should return None");
1450 }
1451
1452 pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1454 clear_partitions(storage, &[0]).await.unwrap();
1455 let results = storage
1456 .query_sorted(Query::new_full_range(0))
1457 .await
1458 .expect("query should succeed");
1459
1460 assert!(results.is_empty());
1461 }
1462
1463 pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1465 clear_partitions(storage, &[0]).await.unwrap();
1466 for i in 0..3 {
1467 let record = Record {
1468 pk: format!("query_partition_{}", i).into(),
1469 partition: 0,
1470 sort_key: Some(SortKey::u32_asc(i)),
1471 data: format!("record_{}", i).as_bytes().to_vec(),
1472 };
1473 storage.put(record).await.expect("put should succeed");
1474 }
1475
1476 let results = storage
1477 .query_sorted(Query::new_full_range(0))
1478 .await
1479 .expect("query should succeed");
1480
1481 assert_eq!(results.len(), 3);
1482 }
1483
1484 pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1486 clear_partitions(storage, &[0]).await.unwrap();
1487 for i in [5, 2, 8, 1, 9] {
1489 let record = Record {
1490 pk: format!("query_asc_{}", i).into(),
1491 partition: 0,
1492 sort_key: Some(SortKey::u32_asc(i)),
1493 data: format!("record_{}", i).as_bytes().to_vec(),
1494 };
1495 storage.put(record).await.expect("put should succeed");
1496 }
1497
1498 let results = storage
1499 .query_sorted(Query::new_full_range(0))
1500 .await
1501 .expect("query should succeed");
1502
1503 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1504 assert_eq!(
1505 values,
1506 vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1507 "should be in ascending order"
1508 );
1509 }
1510
1511 pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1513 clear_partitions(storage, &[0]).await.unwrap();
1514 for i in 0..10 {
1515 let record = Record {
1516 pk: format!("query_limit_{}", i).into(),
1517 partition: 0,
1518 sort_key: Some(SortKey::u32_asc(i)),
1519 data: format!("record_{}", i).as_bytes().to_vec(),
1520 };
1521 storage.put(record).await.expect("put should succeed");
1522 }
1523
1524 let results = storage
1525 .query_sorted(Query::new_full_range(0).limit(3))
1526 .await
1527 .expect("query should succeed");
1528
1529 assert_eq!(results.len(), 3);
1530 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1531 assert_eq!(
1532 values,
1533 vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1534 "should return first 3 records"
1535 );
1536 }
1537
1538 pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1540 clear_partitions(storage, &[0]).await.unwrap();
1541 let with_key_1 = Record {
1543 pk: "query_null_with_1".into(),
1544 partition: 0,
1545 sort_key: Some(SortKey::u32_asc(1)),
1546 data: b"with_key_1".to_vec(),
1547 };
1548 let with_key_2 = Record {
1549 pk: "query_null_with_2".into(),
1550 partition: 0,
1551 sort_key: Some(SortKey::u32_asc(2)),
1552 data: b"with_key_2".to_vec(),
1553 };
1554
1555 let without_key = Record {
1557 pk: "query_null_without".into(),
1558 partition: 0,
1559 sort_key: None,
1560 data: b"no_key".to_vec(),
1561 };
1562
1563 storage.put(with_key_1).await.expect("put should succeed");
1564 storage.put(without_key).await.expect("put should succeed");
1565 storage.put(with_key_2).await.expect("put should succeed");
1566
1567 let results_query = storage.query_sorted(Query::new_full_range(0)).await
1569 .expect("query should succeed");
1570 assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1571 assert_eq!(results_query[0].data, b"with_key_1");
1572 assert_eq!(results_query[1].data, b"with_key_2");
1573
1574 let results_all = storage.get_all(0).await
1576 .expect("get_all should succeed");
1577 assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1578
1579 let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1581 let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1582 let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1583 assert!(has_with_key_1, "get_all should include with_key_1");
1584 assert!(has_with_key_2, "get_all should include with_key_2");
1585 assert!(has_without_key, "get_all should include record without sort key");
1586 }
1587
1588 pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1590 clear_partitions(storage, &[0, 1]).await.unwrap();
1591 for i in 0..3 {
1593 let record = Record {
1594 pk: format!("query_iso_a_{}", i).into(),
1595 partition: 0,
1596 sort_key: Some(SortKey::u32_asc(i)),
1597 data: format!("record_{}", i).as_bytes().to_vec(),
1598 };
1599 storage.put(record).await.expect("put should succeed");
1600 }
1601
1602 for i in 0..5 {
1604 let record = Record {
1605 pk: format!("query_iso_b_{}", i).into(),
1606 partition: 1,
1607 sort_key: Some(SortKey::u32_asc(i)),
1608 data: format!("record_{}", i + 100).as_bytes().to_vec(),
1609 };
1610 storage.put(record).await.expect("put should succeed");
1611 }
1612
1613 let results_a = storage
1614 .query_sorted(Query::new_full_range(0))
1615 .await
1616 .expect("query should succeed");
1617
1618 let results_b = storage
1619 .query_sorted(Query::new_full_range(1))
1620 .await
1621 .expect("query should succeed");
1622
1623 assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1624 assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1625
1626 assert!(results_a
1628 .iter()
1629 .all(|r| r.partition == 0));
1630
1631 assert!(results_b
1633 .iter()
1634 .all(|r| r.partition == 1));
1635 }
1636
1637 pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1639 clear_partitions(storage, &[0]).await.unwrap();
1640
1641 for i in 1..=10u32 {
1643 let record = Record {
1644 pk: format!("query_range_{}", i).into(),
1645 partition: 0,
1646 sort_key: Some(SortKey::u32_asc(i)),
1647 data: format!("record_{}", i).as_bytes().to_vec(),
1648 };
1649 storage.put(record).await.expect("put should succeed");
1650 }
1651
1652 let results_start = storage
1654 .query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1655 .await
1656 .expect("query should succeed");
1657
1658 assert_eq!(results_start.len(), 6, "should return records 5-10");
1659 let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1660 assert_eq!(
1661 values,
1662 vec![
1663 b"record_5".to_vec(),
1664 b"record_6".to_vec(),
1665 b"record_7".to_vec(),
1666 b"record_8".to_vec(),
1667 b"record_9".to_vec(),
1668 b"record_10".to_vec(),
1669 ],
1670 "should return records from 5 onwards"
1671 );
1672
1673 let results_end = storage
1675 .query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1676 .await
1677 .expect("query should succeed");
1678
1679 assert_eq!(results_end.len(), 3, "should return records 1-3");
1680 let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1681 assert_eq!(
1682 values,
1683 vec![
1684 b"record_1".to_vec(),
1685 b"record_2".to_vec(),
1686 b"record_3".to_vec(),
1687 ],
1688 "should return records up to 3"
1689 );
1690
1691 let results_range = storage
1693 .query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1694 .await
1695 .expect("query should succeed");
1696
1697 assert_eq!(results_range.len(), 5, "should return records 3-7");
1698 let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1699 assert_eq!(
1700 values,
1701 vec![
1702 b"record_3".to_vec(),
1703 b"record_4".to_vec(),
1704 b"record_5".to_vec(),
1705 b"record_6".to_vec(),
1706 b"record_7".to_vec(),
1707 ],
1708 "should return records in range 3-7"
1709 );
1710
1711 let results_range_limit = storage
1713 .query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1714 .await
1715 .expect("query should succeed");
1716
1717 assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1718 let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1719 assert_eq!(
1720 values,
1721 vec![
1722 b"record_2".to_vec(),
1723 b"record_3".to_vec(),
1724 b"record_4".to_vec(),
1725 ],
1726 "should return first 3 records in range"
1727 );
1728
1729 let results_empty = storage
1731 .query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1732 .await
1733 .expect("query should succeed");
1734
1735 assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1736 }
1737
1738 pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1740 clear_partitions(storage, &[0]).await.unwrap();
1741
1742 for i in 1..=10u32 {
1743 let record = Record {
1744 pk: format!("query_excl_{}", i).into(),
1745 partition: 0,
1746 sort_key: Some(SortKey::u32_asc(i)),
1747 data: format!("record_{}", i).as_bytes().to_vec(),
1748 };
1749 storage.put(record).await.expect("put should succeed");
1750 }
1751
1752 let results = storage
1754 .query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1755 .await
1756 .expect("query should succeed");
1757
1758 assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1759 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1760 assert_eq!(
1761 values,
1762 vec![
1763 b"record_3".to_vec(),
1764 b"record_4".to_vec(),
1765 b"record_5".to_vec(),
1766 b"record_6".to_vec(),
1767 ],
1768 );
1769
1770 let results = storage
1772 .query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1773 .await
1774 .expect("query should succeed");
1775
1776 assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1777 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1778 assert_eq!(
1779 values,
1780 vec![
1781 b"record_1".to_vec(),
1782 b"record_2".to_vec(),
1783 b"record_3".to_vec(),
1784 ],
1785 );
1786 }
1787
1788 pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1790 clear_partitions(storage, &[0]).await.unwrap();
1791
1792 for i in [5u32, 2, 8, 1, 9] {
1793 let record = Record {
1794 pk: format!("query_limit1_{}", i).into(),
1795 partition: 0,
1796 sort_key: Some(SortKey::u32_asc(i)),
1797 data: format!("record_{}", i).as_bytes().to_vec(),
1798 };
1799 storage.put(record).await.expect("put should succeed");
1800 }
1801
1802 let results = storage
1803 .query_sorted(Query::new_full_range(0).limit(1))
1804 .await
1805 .expect("query should succeed");
1806
1807 assert_eq!(results.len(), 1);
1808 assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1809 }
1810
1811 pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1813 storage.delete(LAST_IDS, b"0").await.unwrap();
1815 let id = storage.incremental_id(0).await
1816 .expect("incremental_id should succeed");
1817
1818 assert_eq!(id, 1, "first id should be 1");
1819 }
1820
1821 pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1823 clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1824
1825 let id1 = storage.incremental_id(0).await
1826 .expect("incremental_id should succeed");
1827 let id2 = storage.incremental_id(0).await
1828 .expect("incremental_id should succeed");
1829 let id3 = storage.incremental_id(0).await
1830 .expect("incremental_id should succeed");
1831
1832 assert_eq!(id1, 1, "first id should be 1");
1833 assert_eq!(id2, 2, "second id should be 2");
1834 assert_eq!(id3, 3, "third id should be 3");
1835 }
1836
1837 pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1839 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1840
1841 let a1 = storage.incremental_id(0).await
1843 .expect("incremental_id should succeed");
1844 let a2 = storage.incremental_id(0).await
1845 .expect("incremental_id should succeed");
1846 let a3 = storage.incremental_id(0).await
1847 .expect("incremental_id should succeed");
1848
1849 let b1 = storage.incremental_id(1).await
1851 .expect("incremental_id should succeed");
1852 let b2 = storage.incremental_id(1).await
1853 .expect("incremental_id should succeed");
1854
1855 assert_eq!(a1, 1);
1857 assert_eq!(a2, 2);
1858 assert_eq!(a3, 3);
1859
1860 assert_eq!(b1, 1);
1862 assert_eq!(b2, 2);
1863
1864 let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1866 assert_eq!(a4, 4);
1867 }
1868
1869 pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1871 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1872
1873 let id1 = storage.incremental_id(0).await
1875 .expect("incremental_id should succeed");
1876 let id2 = storage.incremental_id(0).await
1877 .expect("incremental_id should succeed");
1878 assert_eq!(id1, 1);
1879 assert_eq!(id2, 2);
1880
1881 let stored = storage
1883 .get(LAST_IDS, &[0])
1884 .await
1885 .expect("get should succeed")
1886 .expect("id record should exist");
1887 let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1888 assert_eq!(stored_id, 2, "stored id should be 2");
1889
1890 let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1892 assert_eq!(id3, 3);
1893 }
1894
1895 pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1897 clear_partitions(storage, &[0]).await.unwrap();
1898 let results = storage
1899 .get_all(0)
1900 .await
1901 .expect("get_all should succeed");
1902
1903 assert!(results.is_empty(), "get_all should return empty for empty partition");
1904 }
1905
1906 pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1908 clear_partitions(storage, &[0]).await.unwrap();
1909
1910 for i in 0..5 {
1912 let record = Record {
1913 pk: format!("get_all_{}", i).into(),
1914 partition: 0,
1915 sort_key: Some(SortKey::u32_asc(i)),
1916 data: format!("record_{}", i).as_bytes().to_vec(),
1917 };
1918 storage.put(record).await.expect("put should succeed");
1919 }
1920
1921 let results = storage
1922 .get_all(0)
1923 .await
1924 .expect("get_all should succeed");
1925
1926 assert_eq!(results.len(), 5, "get_all should return all 5 records");
1927
1928 for i in 0..5 {
1930 let expected_data = format!("record_{}", i).as_bytes().to_vec();
1931 let found = results.iter().any(|r| r.data == expected_data);
1932 assert!(found, "get_all should include record_{}", i);
1933 }
1934 }
1935
1936 pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1938 clear_partitions(storage, &[0]).await.unwrap();
1939
1940 let with_key_1 = Record {
1942 pk: "get_all_with_1".into(),
1943 partition: 0,
1944 sort_key: Some(SortKey::u32_asc(1)),
1945 data: b"with_key_1".to_vec(),
1946 };
1947 let with_key_2 = Record {
1948 pk: "get_all_with_2".into(),
1949 partition: 0,
1950 sort_key: Some(SortKey::u32_asc(2)),
1951 data: b"with_key_2".to_vec(),
1952 };
1953
1954 let without_key_1 = Record {
1956 pk: "get_all_without_1".into(),
1957 partition: 0,
1958 sort_key: None,
1959 data: b"without_key_1".to_vec(),
1960 };
1961 let without_key_2 = Record {
1962 pk: "get_all_without_2".into(),
1963 partition: 0,
1964 sort_key: None,
1965 data: b"without_key_2".to_vec(),
1966 };
1967
1968 storage.put(with_key_1).await.expect("put should succeed");
1969 storage.put(without_key_1).await.expect("put should succeed");
1970 storage.put(with_key_2).await.expect("put should succeed");
1971 storage.put(without_key_2).await.expect("put should succeed");
1972
1973 let results = storage
1974 .get_all(0)
1975 .await
1976 .expect("get_all should succeed");
1977
1978 assert_eq!(results.len(), 4, "get_all should return all 4 records");
1979
1980 let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
1982 let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
1983 let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
1984 let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
1985
1986 assert!(has_with_1, "get_all should include with_key_1");
1987 assert!(has_with_2, "get_all should include with_key_2");
1988 assert!(has_without_1, "get_all should include without_key_1");
1989 assert!(has_without_2, "get_all should include without_key_2");
1990
1991 let query_results = storage
1993 .query_sorted(Query::new_full_range(0))
1994 .await
1995 .expect("query should succeed");
1996
1997 assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
1998 let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
1999 assert!(!query_has_without, "query should not include records without sort keys");
2000 }
2001
2002 pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
2004 clear_partitions(storage, &[0, 1]).await.unwrap();
2005
2006 for i in 0..3 {
2008 let record = Record {
2009 pk: format!("partition_0_{}", i).into(),
2010 partition: 0,
2011 sort_key: Some(SortKey::u32_asc(i)),
2012 data: format!("p0_record_{}", i).as_bytes().to_vec(),
2013 };
2014 storage.put(record).await.expect("put should succeed");
2015 }
2016
2017 for i in 0..2 {
2019 let record = Record {
2020 pk: format!("partition_1_{}", i).into(),
2021 partition: 1,
2022 sort_key: Some(SortKey::u32_asc(i)),
2023 data: format!("p1_record_{}", i).as_bytes().to_vec(),
2024 };
2025 storage.put(record).await.expect("put should succeed");
2026 }
2027
2028 let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2030 assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2031 assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2032
2033 let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2035 assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2036 assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2037 }
2038
2039 pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2041 clear_partitions(storage, &[0]).await.unwrap();
2042
2043 for i in 0..3u32 {
2044 let record = Record {
2045 pk: format!("get_all_del_{}", i).into(),
2046 partition: 0,
2047 sort_key: Some(SortKey::u32_asc(i)),
2048 data: format!("record_{}", i).as_bytes().to_vec(),
2049 };
2050 storage.put(record).await.expect("put should succeed");
2051 }
2052
2053 storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2054
2055 let results = storage.get_all(0).await.expect("get_all should succeed");
2056 assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2057
2058 let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2059 assert!(!has_deleted, "deleted record should not appear");
2060 }
2061}