1mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::{SortKey, SortKeyBuilder};
45
46#[cfg(feature = "indexed-db")]
47pub mod indexed_db;
48
49use std::ops::RangeBounds;
50
51use anyhow::Context;
52use bitcoin::{Amount, Transaction, Txid};
53use bitcoin::secp256k1::PublicKey;
54use bitcoin::hashes::Hash;
55#[cfg(feature = "onchain-bdk")]
56use bdk_core::Merge;
57#[cfg(feature = "onchain-bdk")]
58use bdk_wallet::ChangeSet;
59use chrono::{DateTime, Local};
60use lightning_invoice::Bolt11Invoice;
61use serde::{de::DeserializeOwned, Serialize};
62
63use ark::lightning::{PaymentHash, Preimage};
64use ark::{Vtxo, VtxoId};
65use ark::vtxo::Full;
66use bitcoin_ext::BlockDelta;
67
68use crate::actions::{WalletActionCheckpoint, WalletActionId};
69use crate::exit::ExitTxOrigin;
70use crate::movement::{
71 Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
72};
73use crate::persist::BarkPersister;
74use crate::persist::models::{
75 LightningReceive, PaidInvoice, PendingBoard, PendingOffboard,
76 RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
77 StoredRoundState, Unlocked, wallet_vtxo_from_full,
78};
79use crate::round::RoundState;
80use crate::vtxo::{VtxoState, VtxoStateKind};
81use crate::{WalletProperties, WalletVtxo};
82
83
84pub mod partition {
85 pub const PROPERTIES: u8 = 0;
86 #[allow(unused)]
87 pub const BDK_CHANGESET: u8 = 1;
88 pub const VTXO: u8 = 2;
89 pub const PUBLIC_KEY: u8 = 3;
90 pub const PENDING_BOARD: u8 = 4;
91 pub const ROUND_STATE: u8 = 5;
92 pub const MOVEMENT: u8 = 6;
93 #[allow(unused)]
95 pub const LEGACY_LIGHTNING_SEND: u8 = 7;
96 pub const LIGHTNING_RECEIVE: u8 = 8;
97 pub const EXIT_VTXO: u8 = 9;
98 pub const EXIT_CHILD_TX: u8 = 10;
99 pub const MAILBOX_CHECKPOINT: u8 = 11;
100 pub const PENDING_OFFBOARD: u8 = 12;
101 pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
103 pub const WALLET_ACTION_CHECKPOINT: u8 = 14;
105 pub const PAID_INVOICE: u8 = 15;
108
109 pub const LAST_IDS: u8 = u8::MAX;
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub struct Record {
115 pub partition: u8,
119
120 pub pk: Vec<u8>,
122
123 pub sort_key: Option<SortKey>,
132
133 pub data: Vec<u8>,
135}
136
137impl Record {
138 fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
140 serde_json::from_slice(&self.data).map_err(Into::into)
141 }
142
143 fn from_data<T: Serialize>(
145 partition: u8,
146 pk: &[u8],
147 sort_key: Option<SortKey>,
148 data: &T,
149 ) -> anyhow::Result<Record> {
150 Ok(Record {
151 partition,
152 pk: pk.to_vec(),
153 sort_key,
154 data: serde_json::to_vec(data)?,
155 })
156 }
157}
158
159pub trait QueryRange: RangeBounds<SortKey> + Send {}
161
162impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
163
164#[derive(Debug, Clone)]
166pub struct Query<R: QueryRange> {
167 pub partition: u8,
169
170 pub range: R,
172
173 pub limit: Option<usize>,
175}
176
177impl<R: QueryRange> Query<R> {
178 pub fn new(partition: u8, range: R) -> Self {
180 Self {
181 partition,
182 range,
183 limit: None,
184 }
185 }
186
187 pub fn limit(mut self, limit: usize) -> Self {
192 self.limit = Some(limit);
193 self
194 }
195}
196
197impl Query<std::ops::RangeFull> {
198 pub fn new_full_range(partition: u8) -> Self {
199 Self::new(partition, ..)
200 }
201}
202
203fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
204 let body = pm.value_string();
205
206 let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
207 buf.extend(pm.type_str().as_bytes().iter().copied());
208 buf.push(0xfe);
211 buf.extend(body.into_bytes());
212 buf
213}
214
215#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
262#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
263pub trait StorageAdaptor: Send + Sync + 'static {
264 async fn put(&mut self, record: Record) -> anyhow::Result<()>;
266
267 async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
271
272 async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
276
277 async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
283 -> anyhow::Result<Vec<Record>>;
284
285 async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
289
290 async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
292 let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
293 .map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
294 let next_partition_id = last_partition_id + 1;
295
296 let record = Record::from_data(
297 partition::LAST_IDS,
298 &[partition],
299 None,
300 &next_partition_id,
301 )?;
302
303 self.put(record).await?;
304 Ok(next_partition_id)
305 }
306}
307
308async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
309 match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
310 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
311 None => Ok(None),
312 }
313}
314
315async fn get_check_vtxo_state<S: StorageAdaptor>(
316 adaptor: &S,
317 vtxo_id: VtxoId,
318 allowed_states: &[VtxoStateKind],
319) -> anyhow::Result<SerdeVtxo> {
320 let vtxo = get_vtxo(adaptor, vtxo_id).await?
321 .context("vtxo not found")?;
322
323 let current_state = vtxo.current_state().context("vtxo has no state")?;
324 if !allowed_states.contains(¤t_state.kind()) {
325 bail!("current state {:?} not in allowed states {:?}",
326 current_state.kind(), allowed_states
327 );
328 }
329
330 Ok(vtxo)
331}
332
333async fn update_vtxo_state_checked<S: StorageAdaptor>(
334 adaptor: &mut S,
335 vtxo_id: VtxoId,
336 new_state: VtxoState,
337 allowed_old_states: &[VtxoStateKind],
338) -> anyhow::Result<WalletVtxo> {
339 let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
340
341 let sk = sort::vtxo_sort_key(
342 new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
343 );
344
345 serde_vtxo.states.push(new_state.clone());
346 let updated_record = Record::from_data(
347 partition::VTXO,
348 &vtxo_id.to_bytes(),
349 Some(sk),
350 &serde_vtxo,
351 )?;
352
353 adaptor.put(updated_record).await?;
354
355 Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, new_state))
356}
357
358pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
359 inner: tokio::sync::RwLock<S>,
360}
361
362impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
363 pub fn new(inner: S) -> Self {
364 Self {
365 inner: tokio::sync::RwLock::new(inner),
366 }
367 }
368}
369
370#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
372#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
373impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
374 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
375 let record = Record::from_data(
376 partition::PROPERTIES,
377 &[],
379 None,
380 properties,
381 )?;
382 self.inner.write().await.put(record).await
383 }
384
385 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
386 match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
387 Some(record) => Ok(Some(record.to_data()?)),
388 None => Ok(None),
389 }
390 }
391
392 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
393 let mut properties = match self.read_properties().await? {
394 Some(properties) => properties,
395 None => bail!("wallet not initialized"),
396 };
397
398 properties.server_pubkey = Some(server_pubkey);
399
400 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
401 self.inner.write().await.put(record).await
402 }
403
404 async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
405 let mut properties = match self.read_properties().await? {
406 Some(properties) => properties,
407 None => bail!("wallet not initialized"),
408 };
409
410 properties.server_mailbox_pubkey = Some(server_mailbox_pubkey);
411
412 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
413 self.inner.write().await.put(record).await
414 }
415
416 #[cfg(feature = "onchain-bdk")]
417 async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
418 match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
419 Some(record) => record.to_data(),
420 None => Ok(ChangeSet::default()),
421 }
422 }
423
424 #[cfg(feature = "onchain-bdk")]
425 async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
426 let mut current = self.initialize_bdk_wallet().await?;
427 current.merge(changeset.clone());
428
429 let record = Record::from_data(
430 partition::BDK_CHANGESET,
431 &[],
433 None,
434 ¤t,
435 )?;
436 self.inner.write().await.put(record).await
437 }
438
439 async fn create_new_movement(
440 &self,
441 status: MovementStatus,
442 subsystem: &MovementSubsystem,
443 time: DateTime<Local>,
444 ) -> anyhow::Result<MovementId> {
445 let mut lock = self.inner.write().await;
446
447 let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
448 let movement = Movement::new(id, status, subsystem, time);
449
450 let record = Record::from_data(
451 partition::MOVEMENT,
452 &id.to_bytes(),
453 Some(sort::movement_sort_key(&time)),
454 &movement,
455 )?;
456 lock.put(record).await?;
457
458 Ok(id)
459 }
460
461 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
462 let mut guard = self.inner.write().await;
463
464 let record = Record::from_data(
465 partition::MOVEMENT,
466 &movement.id.to_bytes(),
467 Some(sort::movement_sort_key(&movement.time.created_at)),
468 movement,
469 )?;
470 guard.put(record).await?;
471
472 let sent = movement.sent_to.iter().map(|d| &d.destination);
474 let rcvd = movement.received_on.iter().map(|d| &d.destination);
475 for pm in sent.chain(rcvd) {
476 let pm_bytes = serialize_payment_method(pm);
477 let primary_key = {
478 let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
480 buf.extend(pm_bytes.iter().copied());
481 buf.extend(movement.id.to_bytes());
482 buf
483 };
484 let record = Record::from_data(
485 partition::MOVEMENT_PAYMENT_METHOD,
486 &primary_key,
487 Some(SortKey::from_bytes(pm_bytes)),
488 &movement.id.0,
489 )?;
490 guard.put(record).await?;
491 }
492
493 Ok(())
494 }
495
496 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
497 self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
498 .await?
499 .context("movement not found")?
500 .to_data()
501 }
502
503 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
504 let records = self.inner.read().await
505 .query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
506 records.into_iter().map(|r| r.to_data()).collect()
507 }
508
509 async fn get_movements_by_payment_method(
510 &self,
511 payment_method: &PaymentMethod,
512 ) -> anyhow::Result<Vec<Movement>> {
513 let pm_bytes = serialize_payment_method(payment_method);
514 let sort_key = SortKey::from_bytes(pm_bytes);
515
516 let guard = self.inner.read().await;
517 let idx_recs = guard.query_sorted(Query::new(
518 partition::MOVEMENT_PAYMENT_METHOD,
519 sort_key.clone()..=sort_key,
520 )).await?;
521
522 let mut ret = Vec::with_capacity(idx_recs.len());
523 for idx_rec in idx_recs {
524 let id = MovementId::new(idx_rec.to_data::<u32>()
525 .context("corrupt db: movement payment method index value")?);
526
527 ret.push(
528 guard.get(partition::MOVEMENT, &id.to_bytes()).await?
529 .context("corrupt db: movement payment method entry for unknown movement")?
530 .to_data()
531 .context("corrupt db: invalid movement record")?
532 );
533 }
534 Ok(ret)
535 }
536
537 async fn store_pending_board(
538 &self,
539 vtxo: &Vtxo<Full>,
540 funding_tx: &Transaction,
541 movement_id: MovementId,
542 ) -> anyhow::Result<()> {
543 let pending_board = PendingBoard {
544 vtxos: vec![vtxo.id()],
545 amount: vtxo.amount(),
546 funding_tx: funding_tx.clone(),
547 movement_id,
548 };
549
550 let record = Record::from_data(
551 partition::PENDING_BOARD,
552 &vtxo.id().to_bytes(),
553 None,
554 &pending_board,
555 )?;
556
557 self.inner.write().await.put(record).await
558 }
559
560 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
561 self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
562 Ok(())
563 }
564
565 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
566 let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
567 records
568 .into_iter()
569 .map(|r| {
570 let board = r.to_data::<PendingBoard>()?;
571 Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
572 })
573 .collect()
574 }
575
576 async fn get_pending_board_by_vtxo_id(
577 &self,
578 vtxo_id: VtxoId,
579 ) -> anyhow::Result<Option<PendingBoard>> {
580 match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
581 Some(record) => Ok(Some(record.to_data()?)),
582 None => Ok(None),
583 }
584 }
585
586 async fn store_round_state_lock_vtxos(
587 &self,
588 round_state: &RoundState,
589 ) -> anyhow::Result<RoundStateId> {
590 let mut lock = self.inner.write().await;
591
592 let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
593
594 let allowed_states = &[VtxoStateKind::Spendable];
595
596 for vtxo in round_state.participation().inputs.iter() {
598 get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
599 }
600
601 for vtxo in round_state.participation().inputs.iter() {
602 update_vtxo_state_checked(
603 &mut *lock,
604 vtxo.id(),
605 VtxoState::Locked {
606 holder: round_state.movement_id
607 .map(|id| crate::vtxo::VtxoLockHolder::Movement { id }),
608 },
609 allowed_states,
610 ).await?;
611 }
612
613 let serde_state = SerdeRoundState::from(round_state);
614 let record = Record::from_data(
615 partition::ROUND_STATE,
616 &id.to_bytes(),
617 Some(sort::SortKey::u32_asc(id.0)),
618 &serde_state,
619 )?;
620 lock.put(record).await?;
621
622 Ok(id)
623 }
624
625 async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
626 let serde_state = SerdeRoundState::from(round_state.state());
627 let record = Record::from_data(
628 partition::ROUND_STATE,
629 &round_state.id().to_bytes(),
630 Some(sort::SortKey::u32_asc(round_state.id().0)),
631 &serde_state,
632 )?;
633 self.inner.write().await.put(record).await
634 }
635
636 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
637 self.inner.write().await
638 .delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
639 Ok(())
640 }
641
642 async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
643 let record = self.inner.read().await
644 .get(partition::ROUND_STATE, &_id.to_bytes()).await?;
645 match record {
646 Some(r) => {
647 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
648 let id = RoundStateId(u32::from_be_bytes(pk_slice));
649 let state = r.to_data::<SerdeRoundState>()?.into();
650 Ok(Some(StoredRoundState::new(id, state)))
651 },
652 None => Ok(None),
653 }
654 }
655
656 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
657 let records = self.inner.read().await
658 .get_all(partition::ROUND_STATE).await?;
659 records.into_iter()
660 .map(|r| {
661 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
662 Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
663 })
664 .collect()
665 }
666
667 async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
668 let mut lock = self.inner.write().await;
669
670 for (vtxo, state) in vtxos {
671 let serde_vtxo = SerdeVtxo {
672 vtxo: (*vtxo).clone(),
673 states: vec![(*state).clone()],
674 };
675
676 let sk = sort::vtxo_sort_key(
677 state.kind(), vtxo.expiry_height(), vtxo.amount(),
678 );
679 let record = Record::from_data(
680 partition::VTXO,
681 &vtxo.id().to_bytes(),
682 Some(sk),
683 &serde_vtxo,
684 )?;
685 lock.put(record).await?;
686 }
687 Ok(())
688 }
689
690 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
691 let lock = self.inner.read().await;
692 match get_vtxo(&*lock, id).await? {
693 Some(serde_vtxo) => {
694 let state = serde_vtxo.current_state()
695 .context("vtxo has no state")?.clone();
696 Ok(Some(wallet_vtxo_from_full(&serde_vtxo.vtxo, state)))
697 },
698 None => Ok(None),
699 }
700 }
701
702 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
703 let records = self.inner.read().await
704 .query_sorted(Query::new_full_range(partition::VTXO)).await?;
705
706 records
707 .into_iter()
708 .map(|r| {
709 let serde_vtxo = r.to_data::<SerdeVtxo>()?;
710 let state = serde_vtxo
711 .current_state()
712 .cloned()
713 .context("vtxo has no state")?;
714 Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, state))
715 })
716 .collect()
717 }
718
719 async fn get_vtxos_by_state(
720 &self,
721 states: &[VtxoStateKind],
722 ) -> anyhow::Result<Vec<WalletVtxo>> {
723 let lock = self.inner.read().await;
724
725 let range = |state: VtxoStateKind| {
726 let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
727 let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
728 (start, end)
729 };
730
731 let mut records = Vec::new();
732 for state in states {
733 let (start, end) = range(*state);
734 let query = Query::new(partition::VTXO, start..=end);
735
736 for record in lock.query_sorted(query).await? {
737 let serde_vtxo = record.to_data::<SerdeVtxo>()?;
738 let current_state = serde_vtxo.current_state()
739 .context("vtxo has no current state")?.clone();
740 debug_assert_eq!(current_state.kind(), *state);
741 records.push(wallet_vtxo_from_full(&serde_vtxo.vtxo, current_state));
742 }
743 }
744
745 Ok(records)
746 }
747
748 async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
749 let lock = self.inner.read().await;
750 Ok(get_vtxo(&*lock, id).await?.map(|s| s.vtxo))
751 }
752
753 async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
754 let lock = self.inner.read().await;
755 let mut out = Vec::with_capacity(ids.len());
756 for id in ids {
757 let serde_vtxo = get_vtxo(&*lock, *id).await?
758 .with_context(|| format!("vtxo {id} not found"))?;
759 out.push(serde_vtxo.vtxo);
760 }
761 Ok(out)
762 }
763
764 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
765 match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
766 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
767 None => Ok(None),
768 }
769 }
770
771 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
772 match self.get_wallet_vtxo(id).await? {
773 Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
774 None => Ok(false),
775 }
776 }
777
778 async fn update_vtxo_state_checked(
779 &self,
780 vtxo_id: VtxoId,
781 new_state: VtxoState,
782 allowed_old_states: &[VtxoStateKind],
783 ) -> anyhow::Result<WalletVtxo> {
784 let mut lock = self.inner.write().await;
785 update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
786 }
787
788 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
789 let vtxo_key = SerdeVtxoKey { index, public_key };
790 let record = Record::from_data(
791 partition::PUBLIC_KEY,
792 &public_key.serialize()[..],
793 Some(sort::SortKey::u64_desc(index as u64)),
794 &vtxo_key,
795 )?;
796 self.inner.write().await.put(record).await
797 }
798
799 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
800 let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
802 let records = self.inner.read().await.query_sorted(query).await?;
803
804 match records.into_iter().next() {
805 Some(record) => {
806 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
807 Ok(Some(vtxo_key.index))
808 }
809 None => Ok(None),
810 }
811 }
812
813 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
814 match self.inner.read().await
815 .get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
816 {
817 Some(record) => {
818 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
819 Ok(Some(vtxo_key.index))
820 }
821 None => Ok(None),
822 }
823 }
824
825 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
826 match self.inner.read().await
827 .get(partition::MAILBOX_CHECKPOINT, &[]).await?
828 {
829 Some(record) => Ok(record.to_data::<u64>()?),
830 None => Ok(0),
831 }
832 }
833
834 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
835 let mut lock = self.inner.write().await;
836 let record = Record::from_data(
837 partition::MAILBOX_CHECKPOINT,
838 &[],
839 None,
840 &checkpoint,
841 )?;
842 lock.put(record).await?;
843 Ok(())
844 }
845
846 async fn upsert_wallet_action_checkpoint(
847 &self,
848 id: &WalletActionId,
849 checkpoint: &WalletActionCheckpoint,
850 ) -> anyhow::Result<()> {
851 let record = Record::from_data(
852 partition::WALLET_ACTION_CHECKPOINT,
853 id.as_bytes(),
854 None,
855 checkpoint,
856 )?;
857 self.inner.write().await.put(record).await
858 }
859
860 async fn get_wallet_action_checkpoint(
861 &self,
862 id: &WalletActionId,
863 ) -> anyhow::Result<Option<WalletActionCheckpoint>> {
864 match self.inner.read().await
865 .get(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?
866 {
867 Some(record) => Ok(Some(record.to_data()?)),
868 None => Ok(None),
869 }
870 }
871
872 async fn get_all_wallet_action_checkpoints(
873 &self,
874 ) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
875 let records = self.inner.read().await
876 .get_all(partition::WALLET_ACTION_CHECKPOINT).await?;
877 records.into_iter().map(|r| r.to_data()).collect()
878 }
879
880 async fn remove_wallet_action_checkpoint(
881 &self,
882 id: &WalletActionId,
883 ) -> anyhow::Result<()> {
884 self.inner.write().await
885 .delete(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?;
886 Ok(())
887 }
888
889 async fn record_paid_invoice(
890 &self,
891 payment_hash: PaymentHash,
892 preimage: Preimage,
893 ) -> anyhow::Result<()> {
894 let key = payment_hash.to_byte_array();
895 let mut lock = self.inner.write().await;
897 if lock.get(partition::PAID_INVOICE, &key).await?.is_some() {
898 return Ok(());
899 }
900 let paid = PaidInvoice {
901 payment_hash,
902 preimage,
903 paid_at: chrono::Local::now(),
904 };
905 let record = Record::from_data(partition::PAID_INVOICE, &key, None, &paid)?;
906 lock.put(record).await
907 }
908
909 async fn get_paid_invoice(
910 &self,
911 payment_hash: PaymentHash,
912 ) -> anyhow::Result<Option<PaidInvoice>> {
913 match self.inner.read().await
914 .get(partition::PAID_INVOICE, &payment_hash.to_byte_array()).await?
915 {
916 Some(record) => Ok(Some(record.to_data()?)),
917 None => Ok(None),
918 }
919 }
920
921 async fn store_lightning_receive(
922 &self,
923 payment_hash: PaymentHash,
924 preimage: Preimage,
925 invoice: &Bolt11Invoice,
926 htlc_recv_cltv_delta: BlockDelta,
927 ) -> anyhow::Result<()> {
928 let lightning_receive = LightningReceive {
929 payment_hash,
930 payment_preimage: preimage,
931 invoice: invoice.clone(),
932 htlc_recv_cltv_delta,
933 htlc_vtxos: vec![],
934 movement_id: None,
935 finished_at: None,
936 preimage_revealed_at: None,
937 };
938
939 let record = Record::from_data(
940 partition::LIGHTNING_RECEIVE,
941 &payment_hash.to_byte_array(),
942 None,
943 &lightning_receive,
944 )?;
945 self.inner.write().await.put(record).await
946 }
947
948 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
949 let records = self.inner.read().await
950 .get_all(partition::LIGHTNING_RECEIVE).await?;
951 records
952 .into_iter()
953 .filter_map(|r| {
954 let receive = r.to_data::<LightningReceive>().ok()?;
955 if receive.finished_at.is_none() {
956 Some(Ok(receive))
957 } else {
958 None
959 }
960 })
961 .collect()
962 }
963
964 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
965 let mut lock = self.inner.write().await;
966
967 let pk = payment_hash.to_byte_array();
968 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
969 .context("lightning receive not found")?;
970 let mut lightning_receive: LightningReceive = record.to_data()?;
971
972 lightning_receive.preimage_revealed_at = Some(Local::now());
973
974 let updated_record = Record::from_data(
975 partition::LIGHTNING_RECEIVE,
976 &pk,
977 None,
978 &lightning_receive,
979 )?;
980 lock.put(updated_record).await
981 }
982
983 async fn update_lightning_receive(
984 &self,
985 payment_hash: PaymentHash,
986 vtxo_ids: &[VtxoId],
987 movement_id: MovementId,
988 ) -> anyhow::Result<()> {
989 let mut lock = self.inner.write().await;
990 let pk = payment_hash.to_byte_array();
991 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
992 .context("lightning receive not found")?;
993 let mut lightning_receive: LightningReceive = record.to_data()?;
994
995 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
996 for vtxo_id in vtxo_ids {
997 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
998 .context("vtxo not found")?;
999 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
1000 }
1001
1002 lightning_receive.htlc_vtxos = htlc_vtxos;
1003 lightning_receive.movement_id = Some(movement_id);
1004
1005 let updated_record = Record::from_data(
1006 partition::LIGHTNING_RECEIVE,
1007 &pk,
1008 None,
1009 &lightning_receive,
1010 )?;
1011 lock.put(updated_record).await
1012 }
1013
1014 async fn fetch_lightning_receive_by_payment_hash(
1015 &self,
1016 payment_hash: PaymentHash,
1017 ) -> anyhow::Result<Option<LightningReceive>> {
1018 match self.inner.read().await
1019 .get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1020 {
1021 Some(record) => Ok(Some(record.to_data()?)),
1022 None => Ok(None),
1023 }
1024 }
1025
1026 async fn finish_pending_lightning_receive(
1027 &self,
1028 payment_hash: PaymentHash,
1029 ) -> anyhow::Result<()> {
1030 let mut lock = self.inner.write().await;
1031 let pk = payment_hash.to_byte_array();
1032 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1033 .context("lightning receive not found")?;
1034 let mut lightning_receive: LightningReceive = record.to_data()?;
1035
1036 lightning_receive.finished_at = Some(Local::now());
1037
1038 let updated_record = Record::from_data(
1039 partition::LIGHTNING_RECEIVE,
1040 &pk,
1041 None,
1042 &lightning_receive,
1043 )?;
1044 lock.put(updated_record).await
1045 }
1046
1047 async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1048 let record = Record::from_data(
1049 partition::PENDING_OFFBOARD,
1050 &pending.movement_id.to_bytes(),
1051 None,
1052 pending,
1053 )?;
1054 self.inner.write().await.put(record).await
1055 }
1056
1057 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1058 let records = self.inner.read().await
1059 .get_all(partition::PENDING_OFFBOARD).await?;
1060 records.into_iter().map(|r| r.to_data()).collect()
1061 }
1062
1063 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1064 self.inner.write().await
1065 .delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1066 Ok(())
1067 }
1068
1069 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1070 let record = Record::from_data(
1071 partition::EXIT_VTXO,
1072 &exit.vtxo_id.to_bytes(),
1073 None,
1074 exit,
1075 )?;
1076 self.inner.write().await.put(record).await
1077 }
1078
1079 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1080 self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1081 Ok(())
1082 }
1083
1084 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1085 let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1086 records.into_iter().map(|r| r.to_data()).collect()
1087 }
1088
1089 async fn store_exit_child_tx(
1090 &self,
1091 exit_txid: Txid,
1092 child_tx: &Transaction,
1093 origin: ExitTxOrigin,
1094 ) -> anyhow::Result<()> {
1095 let exit_child = SerdeExitChildTx {
1096 child_tx: child_tx.clone(),
1097 origin,
1098 };
1099 let record = Record::from_data(
1100 partition::EXIT_CHILD_TX,
1101 &exit_txid.to_byte_array(),
1102 None,
1103 &exit_child,
1104 )?;
1105 self.inner.write().await.put(record).await
1106 }
1107
1108 async fn get_exit_child_tx(
1109 &self,
1110 exit_txid: Txid,
1111 ) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1112 match self.inner.read().await
1113 .get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1114 {
1115 Some(record) => {
1116 let exit_child = record.to_data::<SerdeExitChildTx>()?;
1117 Ok(Some((exit_child.child_tx, exit_child.origin)))
1118 }
1119 None => Ok(None),
1120 }
1121 }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126 use super::*;
1127
1128 #[test]
1129 fn storage_query_builder() {
1130 let query = Query::new_full_range(0).limit(10);
1131
1132 assert_eq!(query.partition, 0);
1133 assert_eq!(query.limit, Some(10));
1134 assert_eq!(query.range, ..);
1135 }
1136}
1137
1138#[cfg(test)]
1153pub mod test_suite {
1154 use super::*;
1155 use super::partition::LAST_IDS;
1156 use super::sort::SortKey;
1157
1158 async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1159 for partition in partitions {
1160 let records = storage.get_all(*partition).await?;
1161 for record in records {
1162 storage.delete(record.partition, &record.pk).await?;
1163 }
1164 }
1165 Ok(())
1166 }
1167
1168 pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1170 test_put_insert(storage).await;
1172 test_put_upsert(storage).await;
1173 test_put_with_sort_key(storage).await;
1174 test_put_without_sort_key(storage).await;
1175 test_put_multiple_partitions(storage).await;
1176
1177 test_get_existing(storage).await;
1179 test_get_after_update(storage).await;
1180
1181 test_delete_existing(storage).await;
1183 test_delete_nonexistent(storage).await;
1184 test_delete_idempotent(storage).await;
1185
1186 test_query_empty_partition(storage).await;
1188 test_query_returns_partition_records(storage).await;
1189 test_query_ordering(storage).await;
1190 test_query_with_limit(storage).await;
1191 test_query_null_sort_key_excluded(storage).await;
1192 test_query_partition_isolation(storage).await;
1193 test_query_range(storage).await;
1194 test_query_exclusive_end_range(storage).await;
1195 test_query_full_range_limit_one(storage).await;
1196
1197 test_get_all_empty_partition(storage).await;
1199 test_get_all_returns_all_records(storage).await;
1200 test_get_all_includes_records_without_sort_key(storage).await;
1201 test_get_all_partition_isolation(storage).await;
1202 test_get_all_after_delete(storage).await;
1203
1204 test_incremental_id_starts_at_one(storage).await;
1206 test_incremental_id_increments(storage).await;
1207 test_incremental_id_partition_isolation(storage).await;
1208 test_incremental_id_persists_across_operations(storage).await;
1209 }
1210
1211 pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1213 let record = Record {
1214 pk: "put_insert_1".into(),
1215 partition: 0,
1216 sort_key: None,
1217 data: b"test data".to_vec(),
1218 };
1219
1220 storage.put(record).await.expect("put should succeed");
1221
1222 let retrieved = storage
1223 .get(0, b"put_insert_1")
1224 .await
1225 .expect("get should succeed")
1226 .expect("record should exist");
1227
1228 assert_eq!(retrieved.pk, b"put_insert_1");
1229 assert_eq!(retrieved.partition, 0);
1230 assert_eq!(retrieved.data, b"test data");
1231 }
1232
1233 pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1235 let record1 = Record {
1236 pk: b"put_upsert_1".into(),
1237 partition: 0,
1238 sort_key: None,
1239 data: b"original".to_vec(),
1240 };
1241 storage.put(record1).await.expect("first put should succeed");
1242
1243 let record2 = Record {
1244 pk: "put_upsert_1".into(),
1245 partition: 0,
1246 sort_key: None,
1247 data: b"updated".to_vec(),
1248 };
1249 storage
1250 .put(record2)
1251 .await
1252 .expect("second put should succeed");
1253
1254 let retrieved = storage
1255 .get(0, b"put_upsert_1")
1256 .await
1257 .expect("get should succeed")
1258 .expect("record should exist");
1259
1260 assert_eq!(retrieved.data, b"updated", "data should be updated");
1261 }
1262
1263 pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1265 let sort_key = SortKey::u32_asc(42);
1266 let record = Record {
1267 pk: b"put_sort_key_1".into(),
1268 partition: 0,
1269 sort_key: Some(sort_key.clone()),
1270 data: b"with sort key".to_vec(),
1271 };
1272
1273 storage.put(record).await.expect("put should succeed");
1274
1275 let retrieved = storage
1276 .get(0, b"put_sort_key_1")
1277 .await
1278 .expect("get should succeed")
1279 .expect("record should exist");
1280
1281 assert_eq!(retrieved.sort_key, Some(sort_key));
1282 }
1283
1284 pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1286 let record = Record {
1287 pk: b"put_no_sort_key_1".into(),
1288 partition: 0,
1289 sort_key: None,
1290 data: b"no sort key".to_vec(),
1291 };
1292
1293 storage.put(record).await.expect("put should succeed");
1294
1295 let retrieved = storage
1296 .get(0, b"put_no_sort_key_1")
1297 .await
1298 .expect("get should succeed")
1299 .expect("record should exist");
1300
1301 assert!(retrieved.sort_key.is_none());
1302 }
1303
1304 pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1306 let record_a = Record {
1307 pk: "put_multi_a".into(),
1308 partition: 0,
1309 sort_key: None,
1310 data: b"in partition a".to_vec(),
1311 };
1312 let record_b = Record {
1313 pk: "put_multi_b".into(),
1314 partition: 1,
1315 sort_key: None,
1316 data: b"in partition b".to_vec(),
1317 };
1318
1319 storage.put(record_a).await.expect("put a should succeed");
1320 storage.put(record_b).await.expect("put b should succeed");
1321
1322 let retrieved_a = storage
1323 .get(0, b"put_multi_a")
1324 .await
1325 .expect("get should succeed")
1326 .expect("record a should exist");
1327 let retrieved_b = storage
1328 .get(1, b"put_multi_b")
1329 .await
1330 .expect("get should succeed")
1331 .expect("record b should exist");
1332
1333 assert_eq!(retrieved_a.partition, 0);
1334 assert_eq!(retrieved_b.partition, 1);
1335 }
1336
1337 pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1339 let record = Record {
1340 pk: b"get_existing_1".into(),
1341 partition: 0,
1342 sort_key: Some(SortKey::u32_asc(100)),
1343 data: b"test".to_vec(),
1344 };
1345 storage.put(record).await.expect("put should succeed");
1346
1347 let retrieved = storage
1348 .get(0, b"get_existing_1")
1349 .await
1350 .expect("get should succeed");
1351
1352 assert!(retrieved.is_some());
1353 let retrieved = retrieved.unwrap();
1354 assert_eq!(retrieved.pk, b"get_existing_1");
1355 assert_eq!(retrieved.partition, 0);
1356 assert_eq!(retrieved.data, b"test");
1357
1358 assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1360 assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1362
1363 assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1365 }
1366
1367 pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1369 let record1 = Record {
1370 pk: b"get_after_update_1".into(),
1371 partition: 0,
1372 sort_key: None,
1373 data: b"version1".to_vec(),
1374 };
1375 storage.put(record1).await.expect("put should succeed");
1376
1377 let record2 = Record {
1378 pk: b"get_after_update_1".into(),
1379 partition: 0,
1380 sort_key: None,
1381 data: b"version2".to_vec(),
1382 };
1383 storage.put(record2).await.expect("put should succeed");
1384
1385 let retrieved = storage
1386 .get(0, b"get_after_update_1")
1387 .await
1388 .expect("get should succeed")
1389 .expect("record should exist");
1390
1391 assert_eq!(retrieved.data, b"version2");
1392 }
1393
1394 pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1396 let record = Record {
1397 pk: b"delete_existing_1".into(),
1398 partition: 0,
1399 sort_key: None,
1400 data: b"to delete".to_vec(),
1401 };
1402 storage.put(record.clone()).await.expect("put should succeed");
1403
1404 let deleted_record = storage
1405 .delete(0, b"delete_existing_1")
1406 .await
1407 .expect("delete should succeed");
1408
1409 assert_eq!(deleted_record, Some(record));
1410
1411 let retrieved = storage
1412 .get(0, b"delete_existing_1")
1413 .await
1414 .expect("get should succeed");
1415 assert!(retrieved.is_none(), "record should no longer exist");
1416 }
1417
1418 pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1420 let deleted_record = storage
1421 .delete(0, b"delete_nonexistent_does_not_exist")
1422 .await
1423 .expect("delete should succeed");
1424
1425 assert!(
1426 deleted_record.is_none(),
1427 "delete should return None for non-existent record"
1428 );
1429 }
1430
1431 pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1433 let record = Record {
1434 pk: b"delete_idempotent_1".into(),
1435 partition: 0,
1436 sort_key: None,
1437 data: b"delete twice".to_vec(),
1438 };
1439 storage.put(record.clone()).await.expect("put should succeed");
1440
1441 let first_delete = storage
1442 .delete(0, b"delete_idempotent_1")
1443 .await
1444 .expect("first delete should succeed");
1445 let second_delete = storage
1446 .delete(0, b"delete_idempotent_1")
1447 .await
1448 .expect("second delete should succeed");
1449
1450 assert_eq!(first_delete, Some(record), "first delete should return the record");
1451 assert_eq!(second_delete, None, "second delete should return None");
1452 }
1453
1454 pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1456 clear_partitions(storage, &[0]).await.unwrap();
1457 let results = storage
1458 .query_sorted(Query::new_full_range(0))
1459 .await
1460 .expect("query should succeed");
1461
1462 assert!(results.is_empty());
1463 }
1464
1465 pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1467 clear_partitions(storage, &[0]).await.unwrap();
1468 for i in 0..3 {
1469 let record = Record {
1470 pk: format!("query_partition_{}", i).into(),
1471 partition: 0,
1472 sort_key: Some(SortKey::u32_asc(i)),
1473 data: format!("record_{}", i).as_bytes().to_vec(),
1474 };
1475 storage.put(record).await.expect("put should succeed");
1476 }
1477
1478 let results = storage
1479 .query_sorted(Query::new_full_range(0))
1480 .await
1481 .expect("query should succeed");
1482
1483 assert_eq!(results.len(), 3);
1484 }
1485
1486 pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1488 clear_partitions(storage, &[0]).await.unwrap();
1489 for i in [5, 2, 8, 1, 9] {
1491 let record = Record {
1492 pk: format!("query_asc_{}", i).into(),
1493 partition: 0,
1494 sort_key: Some(SortKey::u32_asc(i)),
1495 data: format!("record_{}", i).as_bytes().to_vec(),
1496 };
1497 storage.put(record).await.expect("put should succeed");
1498 }
1499
1500 let results = storage
1501 .query_sorted(Query::new_full_range(0))
1502 .await
1503 .expect("query should succeed");
1504
1505 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1506 assert_eq!(
1507 values,
1508 vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1509 "should be in ascending order"
1510 );
1511 }
1512
1513 pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1515 clear_partitions(storage, &[0]).await.unwrap();
1516 for i in 0..10 {
1517 let record = Record {
1518 pk: format!("query_limit_{}", i).into(),
1519 partition: 0,
1520 sort_key: Some(SortKey::u32_asc(i)),
1521 data: format!("record_{}", i).as_bytes().to_vec(),
1522 };
1523 storage.put(record).await.expect("put should succeed");
1524 }
1525
1526 let results = storage
1527 .query_sorted(Query::new_full_range(0).limit(3))
1528 .await
1529 .expect("query should succeed");
1530
1531 assert_eq!(results.len(), 3);
1532 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1533 assert_eq!(
1534 values,
1535 vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1536 "should return first 3 records"
1537 );
1538 }
1539
1540 pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1542 clear_partitions(storage, &[0]).await.unwrap();
1543 let with_key_1 = Record {
1545 pk: "query_null_with_1".into(),
1546 partition: 0,
1547 sort_key: Some(SortKey::u32_asc(1)),
1548 data: b"with_key_1".to_vec(),
1549 };
1550 let with_key_2 = Record {
1551 pk: "query_null_with_2".into(),
1552 partition: 0,
1553 sort_key: Some(SortKey::u32_asc(2)),
1554 data: b"with_key_2".to_vec(),
1555 };
1556
1557 let without_key = Record {
1559 pk: "query_null_without".into(),
1560 partition: 0,
1561 sort_key: None,
1562 data: b"no_key".to_vec(),
1563 };
1564
1565 storage.put(with_key_1).await.expect("put should succeed");
1566 storage.put(without_key).await.expect("put should succeed");
1567 storage.put(with_key_2).await.expect("put should succeed");
1568
1569 let results_query = storage.query_sorted(Query::new_full_range(0)).await
1571 .expect("query should succeed");
1572 assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1573 assert_eq!(results_query[0].data, b"with_key_1");
1574 assert_eq!(results_query[1].data, b"with_key_2");
1575
1576 let results_all = storage.get_all(0).await
1578 .expect("get_all should succeed");
1579 assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1580
1581 let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1583 let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1584 let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1585 assert!(has_with_key_1, "get_all should include with_key_1");
1586 assert!(has_with_key_2, "get_all should include with_key_2");
1587 assert!(has_without_key, "get_all should include record without sort key");
1588 }
1589
1590 pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1592 clear_partitions(storage, &[0, 1]).await.unwrap();
1593 for i in 0..3 {
1595 let record = Record {
1596 pk: format!("query_iso_a_{}", i).into(),
1597 partition: 0,
1598 sort_key: Some(SortKey::u32_asc(i)),
1599 data: format!("record_{}", i).as_bytes().to_vec(),
1600 };
1601 storage.put(record).await.expect("put should succeed");
1602 }
1603
1604 for i in 0..5 {
1606 let record = Record {
1607 pk: format!("query_iso_b_{}", i).into(),
1608 partition: 1,
1609 sort_key: Some(SortKey::u32_asc(i)),
1610 data: format!("record_{}", i + 100).as_bytes().to_vec(),
1611 };
1612 storage.put(record).await.expect("put should succeed");
1613 }
1614
1615 let results_a = storage
1616 .query_sorted(Query::new_full_range(0))
1617 .await
1618 .expect("query should succeed");
1619
1620 let results_b = storage
1621 .query_sorted(Query::new_full_range(1))
1622 .await
1623 .expect("query should succeed");
1624
1625 assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1626 assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1627
1628 assert!(results_a
1630 .iter()
1631 .all(|r| r.partition == 0));
1632
1633 assert!(results_b
1635 .iter()
1636 .all(|r| r.partition == 1));
1637 }
1638
1639 pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1641 clear_partitions(storage, &[0]).await.unwrap();
1642
1643 for i in 1..=10u32 {
1645 let record = Record {
1646 pk: format!("query_range_{}", i).into(),
1647 partition: 0,
1648 sort_key: Some(SortKey::u32_asc(i)),
1649 data: format!("record_{}", i).as_bytes().to_vec(),
1650 };
1651 storage.put(record).await.expect("put should succeed");
1652 }
1653
1654 let results_start = storage
1656 .query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1657 .await
1658 .expect("query should succeed");
1659
1660 assert_eq!(results_start.len(), 6, "should return records 5-10");
1661 let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1662 assert_eq!(
1663 values,
1664 vec![
1665 b"record_5".to_vec(),
1666 b"record_6".to_vec(),
1667 b"record_7".to_vec(),
1668 b"record_8".to_vec(),
1669 b"record_9".to_vec(),
1670 b"record_10".to_vec(),
1671 ],
1672 "should return records from 5 onwards"
1673 );
1674
1675 let results_end = storage
1677 .query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1678 .await
1679 .expect("query should succeed");
1680
1681 assert_eq!(results_end.len(), 3, "should return records 1-3");
1682 let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1683 assert_eq!(
1684 values,
1685 vec![
1686 b"record_1".to_vec(),
1687 b"record_2".to_vec(),
1688 b"record_3".to_vec(),
1689 ],
1690 "should return records up to 3"
1691 );
1692
1693 let results_range = storage
1695 .query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1696 .await
1697 .expect("query should succeed");
1698
1699 assert_eq!(results_range.len(), 5, "should return records 3-7");
1700 let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1701 assert_eq!(
1702 values,
1703 vec![
1704 b"record_3".to_vec(),
1705 b"record_4".to_vec(),
1706 b"record_5".to_vec(),
1707 b"record_6".to_vec(),
1708 b"record_7".to_vec(),
1709 ],
1710 "should return records in range 3-7"
1711 );
1712
1713 let results_range_limit = storage
1715 .query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1716 .await
1717 .expect("query should succeed");
1718
1719 assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1720 let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1721 assert_eq!(
1722 values,
1723 vec![
1724 b"record_2".to_vec(),
1725 b"record_3".to_vec(),
1726 b"record_4".to_vec(),
1727 ],
1728 "should return first 3 records in range"
1729 );
1730
1731 let results_empty = storage
1733 .query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1734 .await
1735 .expect("query should succeed");
1736
1737 assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1738 }
1739
1740 pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1742 clear_partitions(storage, &[0]).await.unwrap();
1743
1744 for i in 1..=10u32 {
1745 let record = Record {
1746 pk: format!("query_excl_{}", i).into(),
1747 partition: 0,
1748 sort_key: Some(SortKey::u32_asc(i)),
1749 data: format!("record_{}", i).as_bytes().to_vec(),
1750 };
1751 storage.put(record).await.expect("put should succeed");
1752 }
1753
1754 let results = storage
1756 .query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1757 .await
1758 .expect("query should succeed");
1759
1760 assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1761 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1762 assert_eq!(
1763 values,
1764 vec![
1765 b"record_3".to_vec(),
1766 b"record_4".to_vec(),
1767 b"record_5".to_vec(),
1768 b"record_6".to_vec(),
1769 ],
1770 );
1771
1772 let results = storage
1774 .query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1775 .await
1776 .expect("query should succeed");
1777
1778 assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1779 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1780 assert_eq!(
1781 values,
1782 vec![
1783 b"record_1".to_vec(),
1784 b"record_2".to_vec(),
1785 b"record_3".to_vec(),
1786 ],
1787 );
1788 }
1789
1790 pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1792 clear_partitions(storage, &[0]).await.unwrap();
1793
1794 for i in [5u32, 2, 8, 1, 9] {
1795 let record = Record {
1796 pk: format!("query_limit1_{}", i).into(),
1797 partition: 0,
1798 sort_key: Some(SortKey::u32_asc(i)),
1799 data: format!("record_{}", i).as_bytes().to_vec(),
1800 };
1801 storage.put(record).await.expect("put should succeed");
1802 }
1803
1804 let results = storage
1805 .query_sorted(Query::new_full_range(0).limit(1))
1806 .await
1807 .expect("query should succeed");
1808
1809 assert_eq!(results.len(), 1);
1810 assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1811 }
1812
1813 pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1815 storage.delete(LAST_IDS, b"0").await.unwrap();
1817 let id = storage.incremental_id(0).await
1818 .expect("incremental_id should succeed");
1819
1820 assert_eq!(id, 1, "first id should be 1");
1821 }
1822
1823 pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1825 clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1826
1827 let id1 = storage.incremental_id(0).await
1828 .expect("incremental_id should succeed");
1829 let id2 = storage.incremental_id(0).await
1830 .expect("incremental_id should succeed");
1831 let id3 = storage.incremental_id(0).await
1832 .expect("incremental_id should succeed");
1833
1834 assert_eq!(id1, 1, "first id should be 1");
1835 assert_eq!(id2, 2, "second id should be 2");
1836 assert_eq!(id3, 3, "third id should be 3");
1837 }
1838
1839 pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1841 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1842
1843 let a1 = storage.incremental_id(0).await
1845 .expect("incremental_id should succeed");
1846 let a2 = storage.incremental_id(0).await
1847 .expect("incremental_id should succeed");
1848 let a3 = storage.incremental_id(0).await
1849 .expect("incremental_id should succeed");
1850
1851 let b1 = storage.incremental_id(1).await
1853 .expect("incremental_id should succeed");
1854 let b2 = storage.incremental_id(1).await
1855 .expect("incremental_id should succeed");
1856
1857 assert_eq!(a1, 1);
1859 assert_eq!(a2, 2);
1860 assert_eq!(a3, 3);
1861
1862 assert_eq!(b1, 1);
1864 assert_eq!(b2, 2);
1865
1866 let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1868 assert_eq!(a4, 4);
1869 }
1870
1871 pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1873 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1874
1875 let id1 = storage.incremental_id(0).await
1877 .expect("incremental_id should succeed");
1878 let id2 = storage.incremental_id(0).await
1879 .expect("incremental_id should succeed");
1880 assert_eq!(id1, 1);
1881 assert_eq!(id2, 2);
1882
1883 let stored = storage
1885 .get(LAST_IDS, &[0])
1886 .await
1887 .expect("get should succeed")
1888 .expect("id record should exist");
1889 let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1890 assert_eq!(stored_id, 2, "stored id should be 2");
1891
1892 let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1894 assert_eq!(id3, 3);
1895 }
1896
1897 pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1899 clear_partitions(storage, &[0]).await.unwrap();
1900 let results = storage
1901 .get_all(0)
1902 .await
1903 .expect("get_all should succeed");
1904
1905 assert!(results.is_empty(), "get_all should return empty for empty partition");
1906 }
1907
1908 pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1910 clear_partitions(storage, &[0]).await.unwrap();
1911
1912 for i in 0..5 {
1914 let record = Record {
1915 pk: format!("get_all_{}", i).into(),
1916 partition: 0,
1917 sort_key: Some(SortKey::u32_asc(i)),
1918 data: format!("record_{}", i).as_bytes().to_vec(),
1919 };
1920 storage.put(record).await.expect("put should succeed");
1921 }
1922
1923 let results = storage
1924 .get_all(0)
1925 .await
1926 .expect("get_all should succeed");
1927
1928 assert_eq!(results.len(), 5, "get_all should return all 5 records");
1929
1930 for i in 0..5 {
1932 let expected_data = format!("record_{}", i).as_bytes().to_vec();
1933 let found = results.iter().any(|r| r.data == expected_data);
1934 assert!(found, "get_all should include record_{}", i);
1935 }
1936 }
1937
1938 pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1940 clear_partitions(storage, &[0]).await.unwrap();
1941
1942 let with_key_1 = Record {
1944 pk: "get_all_with_1".into(),
1945 partition: 0,
1946 sort_key: Some(SortKey::u32_asc(1)),
1947 data: b"with_key_1".to_vec(),
1948 };
1949 let with_key_2 = Record {
1950 pk: "get_all_with_2".into(),
1951 partition: 0,
1952 sort_key: Some(SortKey::u32_asc(2)),
1953 data: b"with_key_2".to_vec(),
1954 };
1955
1956 let without_key_1 = Record {
1958 pk: "get_all_without_1".into(),
1959 partition: 0,
1960 sort_key: None,
1961 data: b"without_key_1".to_vec(),
1962 };
1963 let without_key_2 = Record {
1964 pk: "get_all_without_2".into(),
1965 partition: 0,
1966 sort_key: None,
1967 data: b"without_key_2".to_vec(),
1968 };
1969
1970 storage.put(with_key_1).await.expect("put should succeed");
1971 storage.put(without_key_1).await.expect("put should succeed");
1972 storage.put(with_key_2).await.expect("put should succeed");
1973 storage.put(without_key_2).await.expect("put should succeed");
1974
1975 let results = storage
1976 .get_all(0)
1977 .await
1978 .expect("get_all should succeed");
1979
1980 assert_eq!(results.len(), 4, "get_all should return all 4 records");
1981
1982 let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
1984 let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
1985 let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
1986 let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
1987
1988 assert!(has_with_1, "get_all should include with_key_1");
1989 assert!(has_with_2, "get_all should include with_key_2");
1990 assert!(has_without_1, "get_all should include without_key_1");
1991 assert!(has_without_2, "get_all should include without_key_2");
1992
1993 let query_results = storage
1995 .query_sorted(Query::new_full_range(0))
1996 .await
1997 .expect("query should succeed");
1998
1999 assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
2000 let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
2001 assert!(!query_has_without, "query should not include records without sort keys");
2002 }
2003
2004 pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
2006 clear_partitions(storage, &[0, 1]).await.unwrap();
2007
2008 for i in 0..3 {
2010 let record = Record {
2011 pk: format!("partition_0_{}", i).into(),
2012 partition: 0,
2013 sort_key: Some(SortKey::u32_asc(i)),
2014 data: format!("p0_record_{}", i).as_bytes().to_vec(),
2015 };
2016 storage.put(record).await.expect("put should succeed");
2017 }
2018
2019 for i in 0..2 {
2021 let record = Record {
2022 pk: format!("partition_1_{}", i).into(),
2023 partition: 1,
2024 sort_key: Some(SortKey::u32_asc(i)),
2025 data: format!("p1_record_{}", i).as_bytes().to_vec(),
2026 };
2027 storage.put(record).await.expect("put should succeed");
2028 }
2029
2030 let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2032 assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2033 assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2034
2035 let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2037 assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2038 assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2039 }
2040
2041 pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2043 clear_partitions(storage, &[0]).await.unwrap();
2044
2045 for i in 0..3u32 {
2046 let record = Record {
2047 pk: format!("get_all_del_{}", i).into(),
2048 partition: 0,
2049 sort_key: Some(SortKey::u32_asc(i)),
2050 data: format!("record_{}", i).as_bytes().to_vec(),
2051 };
2052 storage.put(record).await.expect("put should succeed");
2053 }
2054
2055 storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2056
2057 let results = storage.get_all(0).await.expect("get_all should succeed");
2058 assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2059
2060 let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2061 assert!(!has_deleted, "deleted record should not appear");
2062 }
2063}