1mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::{SortKey, SortKeyBuilder};
45
46#[cfg(feature = "indexed-db")]
47pub mod indexed_db;
48
49use std::ops::RangeBounds;
50
51use anyhow::Context;
52use bitcoin::{Amount, Transaction, Txid};
53use bitcoin::secp256k1::PublicKey;
54use bitcoin::hashes::Hash;
55#[cfg(feature = "onchain-bdk")]
56use bdk_core::Merge;
57#[cfg(feature = "onchain-bdk")]
58use bdk_wallet::ChangeSet;
59use chrono::{DateTime, Local};
60use lightning_invoice::Bolt11Invoice;
61use serde::{de::DeserializeOwned, Serialize};
62
63use ark::lightning::{Invoice, PaymentHash, Preimage};
64use ark::{Vtxo, VtxoId};
65use ark::vtxo::Full;
66use bitcoin_ext::BlockDelta;
67
68use crate::actions::{WalletActionCheckpoint, WalletActionId};
69use crate::exit::ExitTxOrigin;
70use crate::movement::{
71 Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
72};
73use crate::persist::BarkPersister;
74use crate::persist::models::{
75 LightningReceive, LightningSend, PendingBoard, PendingOffboard,
76 RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
77 StoredRoundState, Unlocked, wallet_vtxo_from_full,
78};
79use crate::round::RoundState;
80use crate::vtxo::{VtxoState, VtxoStateKind};
81use crate::{WalletProperties, WalletVtxo};
82
83
84pub mod partition {
85 pub const PROPERTIES: u8 = 0;
86 #[allow(unused)]
87 pub const BDK_CHANGESET: u8 = 1;
88 pub const VTXO: u8 = 2;
89 pub const PUBLIC_KEY: u8 = 3;
90 pub const PENDING_BOARD: u8 = 4;
91 pub const ROUND_STATE: u8 = 5;
92 pub const MOVEMENT: u8 = 6;
93 pub const LIGHTNING_SEND: u8 = 7;
94 pub const LIGHTNING_RECEIVE: u8 = 8;
95 pub const EXIT_VTXO: u8 = 9;
96 pub const EXIT_CHILD_TX: u8 = 10;
97 pub const MAILBOX_CHECKPOINT: u8 = 11;
98 pub const PENDING_OFFBOARD: u8 = 12;
99 pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
101 pub const WALLET_ACTION_CHECKPOINT: u8 = 14;
103
104 pub const LAST_IDS: u8 = u8::MAX;
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109pub struct Record {
110 pub partition: u8,
114
115 pub pk: Vec<u8>,
117
118 pub sort_key: Option<SortKey>,
127
128 pub data: Vec<u8>,
130}
131
132impl Record {
133 fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
135 serde_json::from_slice(&self.data).map_err(Into::into)
136 }
137
138 fn from_data<T: Serialize>(
140 partition: u8,
141 pk: &[u8],
142 sort_key: Option<SortKey>,
143 data: &T,
144 ) -> anyhow::Result<Record> {
145 Ok(Record {
146 partition,
147 pk: pk.to_vec(),
148 sort_key,
149 data: serde_json::to_vec(data)?,
150 })
151 }
152}
153
154pub trait QueryRange: RangeBounds<SortKey> + Send {}
156
157impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
158
159#[derive(Debug, Clone)]
161pub struct Query<R: QueryRange> {
162 pub partition: u8,
164
165 pub range: R,
167
168 pub limit: Option<usize>,
170}
171
172impl<R: QueryRange> Query<R> {
173 pub fn new(partition: u8, range: R) -> Self {
175 Self {
176 partition,
177 range,
178 limit: None,
179 }
180 }
181
182 pub fn limit(mut self, limit: usize) -> Self {
187 self.limit = Some(limit);
188 self
189 }
190}
191
192impl Query<std::ops::RangeFull> {
193 pub fn new_full_range(partition: u8) -> Self {
194 Self::new(partition, ..)
195 }
196}
197
198fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
199 let body = pm.value_string();
200
201 let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
202 buf.extend(pm.type_str().as_bytes().iter().copied());
203 buf.push(0xfe);
206 buf.extend(body.into_bytes());
207 buf
208}
209
210#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
257#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
258pub trait StorageAdaptor: Send + Sync + 'static {
259 async fn put(&mut self, record: Record) -> anyhow::Result<()>;
261
262 async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
266
267 async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
271
272 async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
278 -> anyhow::Result<Vec<Record>>;
279
280 async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
284
285 async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
287 let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
288 .map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
289 let next_partition_id = last_partition_id + 1;
290
291 let record = Record::from_data(
292 partition::LAST_IDS,
293 &[partition],
294 None,
295 &next_partition_id,
296 )?;
297
298 self.put(record).await?;
299 Ok(next_partition_id)
300 }
301}
302
303async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
304 match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
305 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
306 None => Ok(None),
307 }
308}
309
310async fn get_check_vtxo_state<S: StorageAdaptor>(
311 adaptor: &S,
312 vtxo_id: VtxoId,
313 allowed_states: &[VtxoStateKind],
314) -> anyhow::Result<SerdeVtxo> {
315 let vtxo = get_vtxo(adaptor, vtxo_id).await?
316 .context("vtxo not found")?;
317
318 let current_state = vtxo.current_state().context("vtxo has no state")?;
319 if !allowed_states.contains(¤t_state.kind()) {
320 bail!("current state {:?} not in allowed states {:?}",
321 current_state.kind(), allowed_states
322 );
323 }
324
325 Ok(vtxo)
326}
327
328async fn update_vtxo_state_checked<S: StorageAdaptor>(
329 adaptor: &mut S,
330 vtxo_id: VtxoId,
331 new_state: VtxoState,
332 allowed_old_states: &[VtxoStateKind],
333) -> anyhow::Result<WalletVtxo> {
334 let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
335
336 let sk = sort::vtxo_sort_key(
337 new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
338 );
339
340 serde_vtxo.states.push(new_state.clone());
341 let updated_record = Record::from_data(
342 partition::VTXO,
343 &vtxo_id.to_bytes(),
344 Some(sk),
345 &serde_vtxo,
346 )?;
347
348 adaptor.put(updated_record).await?;
349
350 Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, new_state))
351}
352
353pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
354 inner: tokio::sync::RwLock<S>,
355}
356
357impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
358 pub fn new(inner: S) -> Self {
359 Self {
360 inner: tokio::sync::RwLock::new(inner),
361 }
362 }
363}
364
365#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
367#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
368impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
369 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
370 let record = Record::from_data(
371 partition::PROPERTIES,
372 &[],
374 None,
375 properties,
376 )?;
377 self.inner.write().await.put(record).await
378 }
379
380 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
381 match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
382 Some(record) => Ok(Some(record.to_data()?)),
383 None => Ok(None),
384 }
385 }
386
387 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
388 let mut properties = match self.read_properties().await? {
389 Some(properties) => properties,
390 None => bail!("wallet not initialized"),
391 };
392
393 properties.server_pubkey = Some(server_pubkey);
394
395 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
396 self.inner.write().await.put(record).await
397 }
398
399 async fn set_server_mailbox_pubkey(&self, server_mailbox_pubkey: PublicKey) -> anyhow::Result<()> {
400 let mut properties = match self.read_properties().await? {
401 Some(properties) => properties,
402 None => bail!("wallet not initialized"),
403 };
404
405 properties.server_mailbox_pubkey = Some(server_mailbox_pubkey);
406
407 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
408 self.inner.write().await.put(record).await
409 }
410
411 #[cfg(feature = "onchain-bdk")]
412 async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
413 match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
414 Some(record) => record.to_data(),
415 None => Ok(ChangeSet::default()),
416 }
417 }
418
419 #[cfg(feature = "onchain-bdk")]
420 async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
421 let mut current = self.initialize_bdk_wallet().await?;
422 current.merge(changeset.clone());
423
424 let record = Record::from_data(
425 partition::BDK_CHANGESET,
426 &[],
428 None,
429 ¤t,
430 )?;
431 self.inner.write().await.put(record).await
432 }
433
434 async fn create_new_movement(
435 &self,
436 status: MovementStatus,
437 subsystem: &MovementSubsystem,
438 time: DateTime<Local>,
439 ) -> anyhow::Result<MovementId> {
440 let mut lock = self.inner.write().await;
441
442 let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
443 let movement = Movement::new(id, status, subsystem, time);
444
445 let record = Record::from_data(
446 partition::MOVEMENT,
447 &id.to_bytes(),
448 Some(sort::movement_sort_key(&time)),
449 &movement,
450 )?;
451 lock.put(record).await?;
452
453 Ok(id)
454 }
455
456 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
457 let mut guard = self.inner.write().await;
458
459 let record = Record::from_data(
460 partition::MOVEMENT,
461 &movement.id.to_bytes(),
462 Some(sort::movement_sort_key(&movement.time.created_at)),
463 movement,
464 )?;
465 guard.put(record).await?;
466
467 let sent = movement.sent_to.iter().map(|d| &d.destination);
469 let rcvd = movement.received_on.iter().map(|d| &d.destination);
470 for pm in sent.chain(rcvd) {
471 let pm_bytes = serialize_payment_method(pm);
472 let primary_key = {
473 let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
475 buf.extend(pm_bytes.iter().copied());
476 buf.extend(movement.id.to_bytes());
477 buf
478 };
479 let record = Record::from_data(
480 partition::MOVEMENT_PAYMENT_METHOD,
481 &primary_key,
482 Some(SortKey::from_bytes(pm_bytes)),
483 &movement.id.0,
484 )?;
485 guard.put(record).await?;
486 }
487
488 Ok(())
489 }
490
491 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
492 self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
493 .await?
494 .context("movement not found")?
495 .to_data()
496 }
497
498 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
499 let records = self.inner.read().await
500 .query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
501 records.into_iter().map(|r| r.to_data()).collect()
502 }
503
504 async fn get_movements_by_payment_method(
505 &self,
506 payment_method: &PaymentMethod,
507 ) -> anyhow::Result<Vec<Movement>> {
508 let pm_bytes = serialize_payment_method(payment_method);
509 let sort_key = SortKey::from_bytes(pm_bytes);
510
511 let guard = self.inner.read().await;
512 let idx_recs = guard.query_sorted(Query::new(
513 partition::MOVEMENT_PAYMENT_METHOD,
514 sort_key.clone()..=sort_key,
515 )).await?;
516
517 let mut ret = Vec::with_capacity(idx_recs.len());
518 for idx_rec in idx_recs {
519 let id = MovementId::new(idx_rec.to_data::<u32>()
520 .context("corrupt db: movement payment method index value")?);
521
522 ret.push(
523 guard.get(partition::MOVEMENT, &id.to_bytes()).await?
524 .context("corrupt db: movement payment method entry for unknown movement")?
525 .to_data()
526 .context("corrupt db: invalid movement record")?
527 );
528 }
529 Ok(ret)
530 }
531
532 async fn store_pending_board(
533 &self,
534 vtxo: &Vtxo<Full>,
535 funding_tx: &Transaction,
536 movement_id: MovementId,
537 ) -> anyhow::Result<()> {
538 let pending_board = PendingBoard {
539 vtxos: vec![vtxo.id()],
540 amount: vtxo.amount(),
541 funding_tx: funding_tx.clone(),
542 movement_id,
543 };
544
545 let record = Record::from_data(
546 partition::PENDING_BOARD,
547 &vtxo.id().to_bytes(),
548 None,
549 &pending_board,
550 )?;
551
552 self.inner.write().await.put(record).await
553 }
554
555 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
556 self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
557 Ok(())
558 }
559
560 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
561 let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
562 records
563 .into_iter()
564 .map(|r| {
565 let board = r.to_data::<PendingBoard>()?;
566 Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
567 })
568 .collect()
569 }
570
571 async fn get_pending_board_by_vtxo_id(
572 &self,
573 vtxo_id: VtxoId,
574 ) -> anyhow::Result<Option<PendingBoard>> {
575 match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
576 Some(record) => Ok(Some(record.to_data()?)),
577 None => Ok(None),
578 }
579 }
580
581 async fn store_round_state_lock_vtxos(
582 &self,
583 round_state: &RoundState,
584 ) -> anyhow::Result<RoundStateId> {
585 let mut lock = self.inner.write().await;
586
587 let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
588
589 let allowed_states = &[VtxoStateKind::Spendable];
590
591 for vtxo in round_state.participation().inputs.iter() {
593 get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
594 }
595
596 for vtxo in round_state.participation().inputs.iter() {
597 update_vtxo_state_checked(
598 &mut *lock,
599 vtxo.id(),
600 VtxoState::Locked {
601 holder: round_state.movement_id
602 .map(|id| crate::vtxo::VtxoLockHolder::Movement { id }),
603 },
604 allowed_states,
605 ).await?;
606 }
607
608 let serde_state = SerdeRoundState::from(round_state);
609 let record = Record::from_data(
610 partition::ROUND_STATE,
611 &id.to_bytes(),
612 Some(sort::SortKey::u32_asc(id.0)),
613 &serde_state,
614 )?;
615 lock.put(record).await?;
616
617 Ok(id)
618 }
619
620 async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
621 let serde_state = SerdeRoundState::from(round_state.state());
622 let record = Record::from_data(
623 partition::ROUND_STATE,
624 &round_state.id().to_bytes(),
625 Some(sort::SortKey::u32_asc(round_state.id().0)),
626 &serde_state,
627 )?;
628 self.inner.write().await.put(record).await
629 }
630
631 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
632 self.inner.write().await
633 .delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
634 Ok(())
635 }
636
637 async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
638 let record = self.inner.read().await
639 .get(partition::ROUND_STATE, &_id.to_bytes()).await?;
640 match record {
641 Some(r) => {
642 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
643 let id = RoundStateId(u32::from_be_bytes(pk_slice));
644 let state = r.to_data::<SerdeRoundState>()?.into();
645 Ok(Some(StoredRoundState::new(id, state)))
646 },
647 None => Ok(None),
648 }
649 }
650
651 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
652 let records = self.inner.read().await
653 .get_all(partition::ROUND_STATE).await?;
654 records.into_iter()
655 .map(|r| {
656 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
657 Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
658 })
659 .collect()
660 }
661
662 async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
663 let mut lock = self.inner.write().await;
664
665 for (vtxo, state) in vtxos {
666 let serde_vtxo = SerdeVtxo {
667 vtxo: (*vtxo).clone(),
668 states: vec![(*state).clone()],
669 };
670
671 let sk = sort::vtxo_sort_key(
672 state.kind(), vtxo.expiry_height(), vtxo.amount(),
673 );
674 let record = Record::from_data(
675 partition::VTXO,
676 &vtxo.id().to_bytes(),
677 Some(sk),
678 &serde_vtxo,
679 )?;
680 lock.put(record).await?;
681 }
682 Ok(())
683 }
684
685 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
686 let lock = self.inner.read().await;
687 match get_vtxo(&*lock, id).await? {
688 Some(serde_vtxo) => {
689 let state = serde_vtxo.current_state()
690 .context("vtxo has no state")?.clone();
691 Ok(Some(wallet_vtxo_from_full(&serde_vtxo.vtxo, state)))
692 },
693 None => Ok(None),
694 }
695 }
696
697 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
698 let records = self.inner.read().await
699 .query_sorted(Query::new_full_range(partition::VTXO)).await?;
700
701 records
702 .into_iter()
703 .map(|r| {
704 let serde_vtxo = r.to_data::<SerdeVtxo>()?;
705 let state = serde_vtxo
706 .current_state()
707 .cloned()
708 .context("vtxo has no state")?;
709 Ok(wallet_vtxo_from_full(&serde_vtxo.vtxo, state))
710 })
711 .collect()
712 }
713
714 async fn get_vtxos_by_state(
715 &self,
716 states: &[VtxoStateKind],
717 ) -> anyhow::Result<Vec<WalletVtxo>> {
718 let lock = self.inner.read().await;
719
720 let range = |state: VtxoStateKind| {
721 let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
722 let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
723 (start, end)
724 };
725
726 let mut records = Vec::new();
727 for state in states {
728 let (start, end) = range(*state);
729 let query = Query::new(partition::VTXO, start..=end);
730
731 for record in lock.query_sorted(query).await? {
732 let serde_vtxo = record.to_data::<SerdeVtxo>()?;
733 let current_state = serde_vtxo.current_state()
734 .context("vtxo has no current state")?.clone();
735 debug_assert_eq!(current_state.kind(), *state);
736 records.push(wallet_vtxo_from_full(&serde_vtxo.vtxo, current_state));
737 }
738 }
739
740 Ok(records)
741 }
742
743 async fn get_full_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
744 let lock = self.inner.read().await;
745 Ok(get_vtxo(&*lock, id).await?.map(|s| s.vtxo))
746 }
747
748 async fn get_full_vtxos(&self, ids: &[VtxoId]) -> anyhow::Result<Vec<Vtxo<Full>>> {
749 let lock = self.inner.read().await;
750 let mut out = Vec::with_capacity(ids.len());
751 for id in ids {
752 let serde_vtxo = get_vtxo(&*lock, *id).await?
753 .with_context(|| format!("vtxo {id} not found"))?;
754 out.push(serde_vtxo.vtxo);
755 }
756 Ok(out)
757 }
758
759 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
760 match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
761 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
762 None => Ok(None),
763 }
764 }
765
766 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
767 match self.get_wallet_vtxo(id).await? {
768 Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
769 None => Ok(false),
770 }
771 }
772
773 async fn update_vtxo_state_checked(
774 &self,
775 vtxo_id: VtxoId,
776 new_state: VtxoState,
777 allowed_old_states: &[VtxoStateKind],
778 ) -> anyhow::Result<WalletVtxo> {
779 let mut lock = self.inner.write().await;
780 update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
781 }
782
783 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
784 let vtxo_key = SerdeVtxoKey { index, public_key };
785 let record = Record::from_data(
786 partition::PUBLIC_KEY,
787 &public_key.serialize()[..],
788 Some(sort::SortKey::u64_desc(index as u64)),
789 &vtxo_key,
790 )?;
791 self.inner.write().await.put(record).await
792 }
793
794 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
795 let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
797 let records = self.inner.read().await.query_sorted(query).await?;
798
799 match records.into_iter().next() {
800 Some(record) => {
801 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
802 Ok(Some(vtxo_key.index))
803 }
804 None => Ok(None),
805 }
806 }
807
808 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
809 match self.inner.read().await
810 .get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
811 {
812 Some(record) => {
813 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
814 Ok(Some(vtxo_key.index))
815 }
816 None => Ok(None),
817 }
818 }
819
820 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
821 match self.inner.read().await
822 .get(partition::MAILBOX_CHECKPOINT, &[]).await?
823 {
824 Some(record) => Ok(record.to_data::<u64>()?),
825 None => Ok(0),
826 }
827 }
828
829 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
830 let mut lock = self.inner.write().await;
831 let record = Record::from_data(
832 partition::MAILBOX_CHECKPOINT,
833 &[],
834 None,
835 &checkpoint,
836 )?;
837 lock.put(record).await?;
838 Ok(())
839 }
840
841 async fn store_new_pending_lightning_send(
842 &self,
843 invoice: &Invoice,
844 amount: Amount,
845 fee: Amount,
846 vtxo_ids: &[VtxoId],
847 movement_id: MovementId,
848 ) -> anyhow::Result<LightningSend> {
849 let mut lock = self.inner.write().await;
850 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
851 for vtxo_id in vtxo_ids {
852 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
853 .context("vtxo not found")?;
854 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
855 }
856
857 let lightning_send = LightningSend {
858 invoice: invoice.clone(),
859 amount,
860 fee,
861 htlc_vtxos,
862 preimage: None,
863 movement_id,
864 finished_at: None,
865 };
866
867 let record = Record::from_data(
868 partition::LIGHTNING_SEND,
869 &invoice.payment_hash().to_byte_array(),
870 None,
871 &lightning_send,
872 )?;
873
874 lock.put(record).await?;
875
876 Ok(lightning_send)
877 }
878
879 async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
880 let records = self.inner.read().await
881 .get_all(partition::LIGHTNING_SEND).await?;
882 records
883 .into_iter()
884 .filter_map(|r| {
885 let send = r.to_data::<LightningSend>().ok()?;
886 if send.finished_at.is_none() {
887 Some(Ok(send))
888 } else {
889 None
890 }
891 })
892 .collect()
893 }
894
895 async fn finish_lightning_send(
896 &self,
897 payment_hash: PaymentHash,
898 preimage: Option<Preimage>,
899 ) -> anyhow::Result<()> {
900 let mut lock = self.inner.write().await;
901
902 let pk = payment_hash.to_byte_array();
903 let record = lock
904 .get(partition::LIGHTNING_SEND, &pk).await?.context("lightning send not found")?;
905 let mut lightning_send: LightningSend = record.to_data()?;
906
907 lightning_send.preimage = preimage;
908 lightning_send.finished_at = Some(Local::now());
909
910 let updated_record = Record::from_data(
911 partition::LIGHTNING_SEND,
912 &pk,
913 None,
914 &lightning_send,
915 )?;
916 lock.put(updated_record).await?;
917
918 Ok(())
919 }
920
921 async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
922 self.inner.write().await.delete(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?;
923 Ok(())
924 }
925
926 async fn get_lightning_send(
927 &self,
928 payment_hash: PaymentHash,
929 ) -> anyhow::Result<Option<LightningSend>> {
930 match self.inner.read().await
931 .get(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?
932 {
933 Some(record) => Ok(Some(record.to_data()?)),
934 None => Ok(None),
935 }
936 }
937
938 async fn upsert_wallet_action_checkpoint(
939 &self,
940 id: &WalletActionId,
941 checkpoint: &WalletActionCheckpoint,
942 ) -> anyhow::Result<()> {
943 let record = Record::from_data(
944 partition::WALLET_ACTION_CHECKPOINT,
945 id.as_bytes(),
946 None,
947 checkpoint,
948 )?;
949 self.inner.write().await.put(record).await
950 }
951
952 async fn get_wallet_action_checkpoint(
953 &self,
954 id: &WalletActionId,
955 ) -> anyhow::Result<Option<WalletActionCheckpoint>> {
956 match self.inner.read().await
957 .get(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?
958 {
959 Some(record) => Ok(Some(record.to_data()?)),
960 None => Ok(None),
961 }
962 }
963
964 async fn get_all_wallet_action_checkpoints(
965 &self,
966 ) -> anyhow::Result<Vec<WalletActionCheckpoint>> {
967 let records = self.inner.read().await
968 .get_all(partition::WALLET_ACTION_CHECKPOINT).await?;
969 records.into_iter().map(|r| r.to_data()).collect()
970 }
971
972 async fn remove_wallet_action_checkpoint(
973 &self,
974 id: &WalletActionId,
975 ) -> anyhow::Result<()> {
976 self.inner.write().await
977 .delete(partition::WALLET_ACTION_CHECKPOINT, id.as_bytes()).await?;
978 Ok(())
979 }
980
981 async fn store_lightning_receive(
982 &self,
983 payment_hash: PaymentHash,
984 preimage: Preimage,
985 invoice: &Bolt11Invoice,
986 htlc_recv_cltv_delta: BlockDelta,
987 ) -> anyhow::Result<()> {
988 let lightning_receive = LightningReceive {
989 payment_hash,
990 payment_preimage: preimage,
991 invoice: invoice.clone(),
992 htlc_recv_cltv_delta,
993 htlc_vtxos: vec![],
994 movement_id: None,
995 finished_at: None,
996 preimage_revealed_at: None,
997 };
998
999 let record = Record::from_data(
1000 partition::LIGHTNING_RECEIVE,
1001 &payment_hash.to_byte_array(),
1002 None,
1003 &lightning_receive,
1004 )?;
1005 self.inner.write().await.put(record).await
1006 }
1007
1008 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
1009 let records = self.inner.read().await
1010 .get_all(partition::LIGHTNING_RECEIVE).await?;
1011 records
1012 .into_iter()
1013 .filter_map(|r| {
1014 let receive = r.to_data::<LightningReceive>().ok()?;
1015 if receive.finished_at.is_none() {
1016 Some(Ok(receive))
1017 } else {
1018 None
1019 }
1020 })
1021 .collect()
1022 }
1023
1024 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
1025 let mut lock = self.inner.write().await;
1026
1027 let pk = payment_hash.to_byte_array();
1028 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1029 .context("lightning receive not found")?;
1030 let mut lightning_receive: LightningReceive = record.to_data()?;
1031
1032 lightning_receive.preimage_revealed_at = Some(Local::now());
1033
1034 let updated_record = Record::from_data(
1035 partition::LIGHTNING_RECEIVE,
1036 &pk,
1037 None,
1038 &lightning_receive,
1039 )?;
1040 lock.put(updated_record).await
1041 }
1042
1043 async fn update_lightning_receive(
1044 &self,
1045 payment_hash: PaymentHash,
1046 vtxo_ids: &[VtxoId],
1047 movement_id: MovementId,
1048 ) -> anyhow::Result<()> {
1049 let mut lock = self.inner.write().await;
1050 let pk = payment_hash.to_byte_array();
1051 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1052 .context("lightning receive not found")?;
1053 let mut lightning_receive: LightningReceive = record.to_data()?;
1054
1055 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
1056 for vtxo_id in vtxo_ids {
1057 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
1058 .context("vtxo not found")?;
1059 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
1060 }
1061
1062 lightning_receive.htlc_vtxos = htlc_vtxos;
1063 lightning_receive.movement_id = Some(movement_id);
1064
1065 let updated_record = Record::from_data(
1066 partition::LIGHTNING_RECEIVE,
1067 &pk,
1068 None,
1069 &lightning_receive,
1070 )?;
1071 lock.put(updated_record).await
1072 }
1073
1074 async fn fetch_lightning_receive_by_payment_hash(
1075 &self,
1076 payment_hash: PaymentHash,
1077 ) -> anyhow::Result<Option<LightningReceive>> {
1078 match self.inner.read().await
1079 .get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1080 {
1081 Some(record) => Ok(Some(record.to_data()?)),
1082 None => Ok(None),
1083 }
1084 }
1085
1086 async fn finish_pending_lightning_receive(
1087 &self,
1088 payment_hash: PaymentHash,
1089 ) -> anyhow::Result<()> {
1090 let mut lock = self.inner.write().await;
1091 let pk = payment_hash.to_byte_array();
1092 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1093 .context("lightning receive not found")?;
1094 let mut lightning_receive: LightningReceive = record.to_data()?;
1095
1096 lightning_receive.finished_at = Some(Local::now());
1097
1098 let updated_record = Record::from_data(
1099 partition::LIGHTNING_RECEIVE,
1100 &pk,
1101 None,
1102 &lightning_receive,
1103 )?;
1104 lock.put(updated_record).await
1105 }
1106
1107 async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1108 let record = Record::from_data(
1109 partition::PENDING_OFFBOARD,
1110 &pending.movement_id.to_bytes(),
1111 None,
1112 pending,
1113 )?;
1114 self.inner.write().await.put(record).await
1115 }
1116
1117 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1118 let records = self.inner.read().await
1119 .get_all(partition::PENDING_OFFBOARD).await?;
1120 records.into_iter().map(|r| r.to_data()).collect()
1121 }
1122
1123 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1124 self.inner.write().await
1125 .delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1126 Ok(())
1127 }
1128
1129 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1130 let record = Record::from_data(
1131 partition::EXIT_VTXO,
1132 &exit.vtxo_id.to_bytes(),
1133 None,
1134 exit,
1135 )?;
1136 self.inner.write().await.put(record).await
1137 }
1138
1139 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1140 self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1141 Ok(())
1142 }
1143
1144 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1145 let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1146 records.into_iter().map(|r| r.to_data()).collect()
1147 }
1148
1149 async fn store_exit_child_tx(
1150 &self,
1151 exit_txid: Txid,
1152 child_tx: &Transaction,
1153 origin: ExitTxOrigin,
1154 ) -> anyhow::Result<()> {
1155 let exit_child = SerdeExitChildTx {
1156 child_tx: child_tx.clone(),
1157 origin,
1158 };
1159 let record = Record::from_data(
1160 partition::EXIT_CHILD_TX,
1161 &exit_txid.to_byte_array(),
1162 None,
1163 &exit_child,
1164 )?;
1165 self.inner.write().await.put(record).await
1166 }
1167
1168 async fn get_exit_child_tx(
1169 &self,
1170 exit_txid: Txid,
1171 ) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1172 match self.inner.read().await
1173 .get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1174 {
1175 Some(record) => {
1176 let exit_child = record.to_data::<SerdeExitChildTx>()?;
1177 Ok(Some((exit_child.child_tx, exit_child.origin)))
1178 }
1179 None => Ok(None),
1180 }
1181 }
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186 use super::*;
1187
1188 #[test]
1189 fn storage_query_builder() {
1190 let query = Query::new_full_range(0).limit(10);
1191
1192 assert_eq!(query.partition, 0);
1193 assert_eq!(query.limit, Some(10));
1194 assert_eq!(query.range, ..);
1195 }
1196}
1197
1198#[cfg(test)]
1213pub mod test_suite {
1214 use super::*;
1215 use super::partition::LAST_IDS;
1216 use super::sort::SortKey;
1217
1218 async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1219 for partition in partitions {
1220 let records = storage.get_all(*partition).await?;
1221 for record in records {
1222 storage.delete(record.partition, &record.pk).await?;
1223 }
1224 }
1225 Ok(())
1226 }
1227
1228 pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1230 test_put_insert(storage).await;
1232 test_put_upsert(storage).await;
1233 test_put_with_sort_key(storage).await;
1234 test_put_without_sort_key(storage).await;
1235 test_put_multiple_partitions(storage).await;
1236
1237 test_get_existing(storage).await;
1239 test_get_after_update(storage).await;
1240
1241 test_delete_existing(storage).await;
1243 test_delete_nonexistent(storage).await;
1244 test_delete_idempotent(storage).await;
1245
1246 test_query_empty_partition(storage).await;
1248 test_query_returns_partition_records(storage).await;
1249 test_query_ordering(storage).await;
1250 test_query_with_limit(storage).await;
1251 test_query_null_sort_key_excluded(storage).await;
1252 test_query_partition_isolation(storage).await;
1253 test_query_range(storage).await;
1254 test_query_exclusive_end_range(storage).await;
1255 test_query_full_range_limit_one(storage).await;
1256
1257 test_get_all_empty_partition(storage).await;
1259 test_get_all_returns_all_records(storage).await;
1260 test_get_all_includes_records_without_sort_key(storage).await;
1261 test_get_all_partition_isolation(storage).await;
1262 test_get_all_after_delete(storage).await;
1263
1264 test_incremental_id_starts_at_one(storage).await;
1266 test_incremental_id_increments(storage).await;
1267 test_incremental_id_partition_isolation(storage).await;
1268 test_incremental_id_persists_across_operations(storage).await;
1269 }
1270
1271 pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1273 let record = Record {
1274 pk: "put_insert_1".into(),
1275 partition: 0,
1276 sort_key: None,
1277 data: b"test data".to_vec(),
1278 };
1279
1280 storage.put(record).await.expect("put should succeed");
1281
1282 let retrieved = storage
1283 .get(0, b"put_insert_1")
1284 .await
1285 .expect("get should succeed")
1286 .expect("record should exist");
1287
1288 assert_eq!(retrieved.pk, b"put_insert_1");
1289 assert_eq!(retrieved.partition, 0);
1290 assert_eq!(retrieved.data, b"test data");
1291 }
1292
1293 pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1295 let record1 = Record {
1296 pk: b"put_upsert_1".into(),
1297 partition: 0,
1298 sort_key: None,
1299 data: b"original".to_vec(),
1300 };
1301 storage.put(record1).await.expect("first put should succeed");
1302
1303 let record2 = Record {
1304 pk: "put_upsert_1".into(),
1305 partition: 0,
1306 sort_key: None,
1307 data: b"updated".to_vec(),
1308 };
1309 storage
1310 .put(record2)
1311 .await
1312 .expect("second put should succeed");
1313
1314 let retrieved = storage
1315 .get(0, b"put_upsert_1")
1316 .await
1317 .expect("get should succeed")
1318 .expect("record should exist");
1319
1320 assert_eq!(retrieved.data, b"updated", "data should be updated");
1321 }
1322
1323 pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1325 let sort_key = SortKey::u32_asc(42);
1326 let record = Record {
1327 pk: b"put_sort_key_1".into(),
1328 partition: 0,
1329 sort_key: Some(sort_key.clone()),
1330 data: b"with sort key".to_vec(),
1331 };
1332
1333 storage.put(record).await.expect("put should succeed");
1334
1335 let retrieved = storage
1336 .get(0, b"put_sort_key_1")
1337 .await
1338 .expect("get should succeed")
1339 .expect("record should exist");
1340
1341 assert_eq!(retrieved.sort_key, Some(sort_key));
1342 }
1343
1344 pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1346 let record = Record {
1347 pk: b"put_no_sort_key_1".into(),
1348 partition: 0,
1349 sort_key: None,
1350 data: b"no sort key".to_vec(),
1351 };
1352
1353 storage.put(record).await.expect("put should succeed");
1354
1355 let retrieved = storage
1356 .get(0, b"put_no_sort_key_1")
1357 .await
1358 .expect("get should succeed")
1359 .expect("record should exist");
1360
1361 assert!(retrieved.sort_key.is_none());
1362 }
1363
1364 pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1366 let record_a = Record {
1367 pk: "put_multi_a".into(),
1368 partition: 0,
1369 sort_key: None,
1370 data: b"in partition a".to_vec(),
1371 };
1372 let record_b = Record {
1373 pk: "put_multi_b".into(),
1374 partition: 1,
1375 sort_key: None,
1376 data: b"in partition b".to_vec(),
1377 };
1378
1379 storage.put(record_a).await.expect("put a should succeed");
1380 storage.put(record_b).await.expect("put b should succeed");
1381
1382 let retrieved_a = storage
1383 .get(0, b"put_multi_a")
1384 .await
1385 .expect("get should succeed")
1386 .expect("record a should exist");
1387 let retrieved_b = storage
1388 .get(1, b"put_multi_b")
1389 .await
1390 .expect("get should succeed")
1391 .expect("record b should exist");
1392
1393 assert_eq!(retrieved_a.partition, 0);
1394 assert_eq!(retrieved_b.partition, 1);
1395 }
1396
1397 pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1399 let record = Record {
1400 pk: b"get_existing_1".into(),
1401 partition: 0,
1402 sort_key: Some(SortKey::u32_asc(100)),
1403 data: b"test".to_vec(),
1404 };
1405 storage.put(record).await.expect("put should succeed");
1406
1407 let retrieved = storage
1408 .get(0, b"get_existing_1")
1409 .await
1410 .expect("get should succeed");
1411
1412 assert!(retrieved.is_some());
1413 let retrieved = retrieved.unwrap();
1414 assert_eq!(retrieved.pk, b"get_existing_1");
1415 assert_eq!(retrieved.partition, 0);
1416 assert_eq!(retrieved.data, b"test");
1417
1418 assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1420 assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1422
1423 assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1425 }
1426
1427 pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1429 let record1 = Record {
1430 pk: b"get_after_update_1".into(),
1431 partition: 0,
1432 sort_key: None,
1433 data: b"version1".to_vec(),
1434 };
1435 storage.put(record1).await.expect("put should succeed");
1436
1437 let record2 = Record {
1438 pk: b"get_after_update_1".into(),
1439 partition: 0,
1440 sort_key: None,
1441 data: b"version2".to_vec(),
1442 };
1443 storage.put(record2).await.expect("put should succeed");
1444
1445 let retrieved = storage
1446 .get(0, b"get_after_update_1")
1447 .await
1448 .expect("get should succeed")
1449 .expect("record should exist");
1450
1451 assert_eq!(retrieved.data, b"version2");
1452 }
1453
1454 pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1456 let record = Record {
1457 pk: b"delete_existing_1".into(),
1458 partition: 0,
1459 sort_key: None,
1460 data: b"to delete".to_vec(),
1461 };
1462 storage.put(record.clone()).await.expect("put should succeed");
1463
1464 let deleted_record = storage
1465 .delete(0, b"delete_existing_1")
1466 .await
1467 .expect("delete should succeed");
1468
1469 assert_eq!(deleted_record, Some(record));
1470
1471 let retrieved = storage
1472 .get(0, b"delete_existing_1")
1473 .await
1474 .expect("get should succeed");
1475 assert!(retrieved.is_none(), "record should no longer exist");
1476 }
1477
1478 pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1480 let deleted_record = storage
1481 .delete(0, b"delete_nonexistent_does_not_exist")
1482 .await
1483 .expect("delete should succeed");
1484
1485 assert!(
1486 deleted_record.is_none(),
1487 "delete should return None for non-existent record"
1488 );
1489 }
1490
1491 pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1493 let record = Record {
1494 pk: b"delete_idempotent_1".into(),
1495 partition: 0,
1496 sort_key: None,
1497 data: b"delete twice".to_vec(),
1498 };
1499 storage.put(record.clone()).await.expect("put should succeed");
1500
1501 let first_delete = storage
1502 .delete(0, b"delete_idempotent_1")
1503 .await
1504 .expect("first delete should succeed");
1505 let second_delete = storage
1506 .delete(0, b"delete_idempotent_1")
1507 .await
1508 .expect("second delete should succeed");
1509
1510 assert_eq!(first_delete, Some(record), "first delete should return the record");
1511 assert_eq!(second_delete, None, "second delete should return None");
1512 }
1513
1514 pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1516 clear_partitions(storage, &[0]).await.unwrap();
1517 let results = storage
1518 .query_sorted(Query::new_full_range(0))
1519 .await
1520 .expect("query should succeed");
1521
1522 assert!(results.is_empty());
1523 }
1524
1525 pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1527 clear_partitions(storage, &[0]).await.unwrap();
1528 for i in 0..3 {
1529 let record = Record {
1530 pk: format!("query_partition_{}", i).into(),
1531 partition: 0,
1532 sort_key: Some(SortKey::u32_asc(i)),
1533 data: format!("record_{}", i).as_bytes().to_vec(),
1534 };
1535 storage.put(record).await.expect("put should succeed");
1536 }
1537
1538 let results = storage
1539 .query_sorted(Query::new_full_range(0))
1540 .await
1541 .expect("query should succeed");
1542
1543 assert_eq!(results.len(), 3);
1544 }
1545
1546 pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1548 clear_partitions(storage, &[0]).await.unwrap();
1549 for i in [5, 2, 8, 1, 9] {
1551 let record = Record {
1552 pk: format!("query_asc_{}", i).into(),
1553 partition: 0,
1554 sort_key: Some(SortKey::u32_asc(i)),
1555 data: format!("record_{}", i).as_bytes().to_vec(),
1556 };
1557 storage.put(record).await.expect("put should succeed");
1558 }
1559
1560 let results = storage
1561 .query_sorted(Query::new_full_range(0))
1562 .await
1563 .expect("query should succeed");
1564
1565 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1566 assert_eq!(
1567 values,
1568 vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1569 "should be in ascending order"
1570 );
1571 }
1572
1573 pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1575 clear_partitions(storage, &[0]).await.unwrap();
1576 for i in 0..10 {
1577 let record = Record {
1578 pk: format!("query_limit_{}", i).into(),
1579 partition: 0,
1580 sort_key: Some(SortKey::u32_asc(i)),
1581 data: format!("record_{}", i).as_bytes().to_vec(),
1582 };
1583 storage.put(record).await.expect("put should succeed");
1584 }
1585
1586 let results = storage
1587 .query_sorted(Query::new_full_range(0).limit(3))
1588 .await
1589 .expect("query should succeed");
1590
1591 assert_eq!(results.len(), 3);
1592 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1593 assert_eq!(
1594 values,
1595 vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1596 "should return first 3 records"
1597 );
1598 }
1599
1600 pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1602 clear_partitions(storage, &[0]).await.unwrap();
1603 let with_key_1 = Record {
1605 pk: "query_null_with_1".into(),
1606 partition: 0,
1607 sort_key: Some(SortKey::u32_asc(1)),
1608 data: b"with_key_1".to_vec(),
1609 };
1610 let with_key_2 = Record {
1611 pk: "query_null_with_2".into(),
1612 partition: 0,
1613 sort_key: Some(SortKey::u32_asc(2)),
1614 data: b"with_key_2".to_vec(),
1615 };
1616
1617 let without_key = Record {
1619 pk: "query_null_without".into(),
1620 partition: 0,
1621 sort_key: None,
1622 data: b"no_key".to_vec(),
1623 };
1624
1625 storage.put(with_key_1).await.expect("put should succeed");
1626 storage.put(without_key).await.expect("put should succeed");
1627 storage.put(with_key_2).await.expect("put should succeed");
1628
1629 let results_query = storage.query_sorted(Query::new_full_range(0)).await
1631 .expect("query should succeed");
1632 assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1633 assert_eq!(results_query[0].data, b"with_key_1");
1634 assert_eq!(results_query[1].data, b"with_key_2");
1635
1636 let results_all = storage.get_all(0).await
1638 .expect("get_all should succeed");
1639 assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1640
1641 let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1643 let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1644 let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1645 assert!(has_with_key_1, "get_all should include with_key_1");
1646 assert!(has_with_key_2, "get_all should include with_key_2");
1647 assert!(has_without_key, "get_all should include record without sort key");
1648 }
1649
1650 pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1652 clear_partitions(storage, &[0, 1]).await.unwrap();
1653 for i in 0..3 {
1655 let record = Record {
1656 pk: format!("query_iso_a_{}", i).into(),
1657 partition: 0,
1658 sort_key: Some(SortKey::u32_asc(i)),
1659 data: format!("record_{}", i).as_bytes().to_vec(),
1660 };
1661 storage.put(record).await.expect("put should succeed");
1662 }
1663
1664 for i in 0..5 {
1666 let record = Record {
1667 pk: format!("query_iso_b_{}", i).into(),
1668 partition: 1,
1669 sort_key: Some(SortKey::u32_asc(i)),
1670 data: format!("record_{}", i + 100).as_bytes().to_vec(),
1671 };
1672 storage.put(record).await.expect("put should succeed");
1673 }
1674
1675 let results_a = storage
1676 .query_sorted(Query::new_full_range(0))
1677 .await
1678 .expect("query should succeed");
1679
1680 let results_b = storage
1681 .query_sorted(Query::new_full_range(1))
1682 .await
1683 .expect("query should succeed");
1684
1685 assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1686 assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1687
1688 assert!(results_a
1690 .iter()
1691 .all(|r| r.partition == 0));
1692
1693 assert!(results_b
1695 .iter()
1696 .all(|r| r.partition == 1));
1697 }
1698
1699 pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1701 clear_partitions(storage, &[0]).await.unwrap();
1702
1703 for i in 1..=10u32 {
1705 let record = Record {
1706 pk: format!("query_range_{}", i).into(),
1707 partition: 0,
1708 sort_key: Some(SortKey::u32_asc(i)),
1709 data: format!("record_{}", i).as_bytes().to_vec(),
1710 };
1711 storage.put(record).await.expect("put should succeed");
1712 }
1713
1714 let results_start = storage
1716 .query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1717 .await
1718 .expect("query should succeed");
1719
1720 assert_eq!(results_start.len(), 6, "should return records 5-10");
1721 let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1722 assert_eq!(
1723 values,
1724 vec![
1725 b"record_5".to_vec(),
1726 b"record_6".to_vec(),
1727 b"record_7".to_vec(),
1728 b"record_8".to_vec(),
1729 b"record_9".to_vec(),
1730 b"record_10".to_vec(),
1731 ],
1732 "should return records from 5 onwards"
1733 );
1734
1735 let results_end = storage
1737 .query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1738 .await
1739 .expect("query should succeed");
1740
1741 assert_eq!(results_end.len(), 3, "should return records 1-3");
1742 let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1743 assert_eq!(
1744 values,
1745 vec![
1746 b"record_1".to_vec(),
1747 b"record_2".to_vec(),
1748 b"record_3".to_vec(),
1749 ],
1750 "should return records up to 3"
1751 );
1752
1753 let results_range = storage
1755 .query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1756 .await
1757 .expect("query should succeed");
1758
1759 assert_eq!(results_range.len(), 5, "should return records 3-7");
1760 let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1761 assert_eq!(
1762 values,
1763 vec![
1764 b"record_3".to_vec(),
1765 b"record_4".to_vec(),
1766 b"record_5".to_vec(),
1767 b"record_6".to_vec(),
1768 b"record_7".to_vec(),
1769 ],
1770 "should return records in range 3-7"
1771 );
1772
1773 let results_range_limit = storage
1775 .query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1776 .await
1777 .expect("query should succeed");
1778
1779 assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1780 let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1781 assert_eq!(
1782 values,
1783 vec![
1784 b"record_2".to_vec(),
1785 b"record_3".to_vec(),
1786 b"record_4".to_vec(),
1787 ],
1788 "should return first 3 records in range"
1789 );
1790
1791 let results_empty = storage
1793 .query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1794 .await
1795 .expect("query should succeed");
1796
1797 assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1798 }
1799
1800 pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1802 clear_partitions(storage, &[0]).await.unwrap();
1803
1804 for i in 1..=10u32 {
1805 let record = Record {
1806 pk: format!("query_excl_{}", i).into(),
1807 partition: 0,
1808 sort_key: Some(SortKey::u32_asc(i)),
1809 data: format!("record_{}", i).as_bytes().to_vec(),
1810 };
1811 storage.put(record).await.expect("put should succeed");
1812 }
1813
1814 let results = storage
1816 .query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1817 .await
1818 .expect("query should succeed");
1819
1820 assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1821 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1822 assert_eq!(
1823 values,
1824 vec![
1825 b"record_3".to_vec(),
1826 b"record_4".to_vec(),
1827 b"record_5".to_vec(),
1828 b"record_6".to_vec(),
1829 ],
1830 );
1831
1832 let results = storage
1834 .query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1835 .await
1836 .expect("query should succeed");
1837
1838 assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1839 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1840 assert_eq!(
1841 values,
1842 vec![
1843 b"record_1".to_vec(),
1844 b"record_2".to_vec(),
1845 b"record_3".to_vec(),
1846 ],
1847 );
1848 }
1849
1850 pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1852 clear_partitions(storage, &[0]).await.unwrap();
1853
1854 for i in [5u32, 2, 8, 1, 9] {
1855 let record = Record {
1856 pk: format!("query_limit1_{}", i).into(),
1857 partition: 0,
1858 sort_key: Some(SortKey::u32_asc(i)),
1859 data: format!("record_{}", i).as_bytes().to_vec(),
1860 };
1861 storage.put(record).await.expect("put should succeed");
1862 }
1863
1864 let results = storage
1865 .query_sorted(Query::new_full_range(0).limit(1))
1866 .await
1867 .expect("query should succeed");
1868
1869 assert_eq!(results.len(), 1);
1870 assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1871 }
1872
1873 pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1875 storage.delete(LAST_IDS, b"0").await.unwrap();
1877 let id = storage.incremental_id(0).await
1878 .expect("incremental_id should succeed");
1879
1880 assert_eq!(id, 1, "first id should be 1");
1881 }
1882
1883 pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1885 clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1886
1887 let id1 = storage.incremental_id(0).await
1888 .expect("incremental_id should succeed");
1889 let id2 = storage.incremental_id(0).await
1890 .expect("incremental_id should succeed");
1891 let id3 = storage.incremental_id(0).await
1892 .expect("incremental_id should succeed");
1893
1894 assert_eq!(id1, 1, "first id should be 1");
1895 assert_eq!(id2, 2, "second id should be 2");
1896 assert_eq!(id3, 3, "third id should be 3");
1897 }
1898
1899 pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1901 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1902
1903 let a1 = storage.incremental_id(0).await
1905 .expect("incremental_id should succeed");
1906 let a2 = storage.incremental_id(0).await
1907 .expect("incremental_id should succeed");
1908 let a3 = storage.incremental_id(0).await
1909 .expect("incremental_id should succeed");
1910
1911 let b1 = storage.incremental_id(1).await
1913 .expect("incremental_id should succeed");
1914 let b2 = storage.incremental_id(1).await
1915 .expect("incremental_id should succeed");
1916
1917 assert_eq!(a1, 1);
1919 assert_eq!(a2, 2);
1920 assert_eq!(a3, 3);
1921
1922 assert_eq!(b1, 1);
1924 assert_eq!(b2, 2);
1925
1926 let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1928 assert_eq!(a4, 4);
1929 }
1930
1931 pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1933 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1934
1935 let id1 = storage.incremental_id(0).await
1937 .expect("incremental_id should succeed");
1938 let id2 = storage.incremental_id(0).await
1939 .expect("incremental_id should succeed");
1940 assert_eq!(id1, 1);
1941 assert_eq!(id2, 2);
1942
1943 let stored = storage
1945 .get(LAST_IDS, &[0])
1946 .await
1947 .expect("get should succeed")
1948 .expect("id record should exist");
1949 let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1950 assert_eq!(stored_id, 2, "stored id should be 2");
1951
1952 let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1954 assert_eq!(id3, 3);
1955 }
1956
1957 pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1959 clear_partitions(storage, &[0]).await.unwrap();
1960 let results = storage
1961 .get_all(0)
1962 .await
1963 .expect("get_all should succeed");
1964
1965 assert!(results.is_empty(), "get_all should return empty for empty partition");
1966 }
1967
1968 pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1970 clear_partitions(storage, &[0]).await.unwrap();
1971
1972 for i in 0..5 {
1974 let record = Record {
1975 pk: format!("get_all_{}", i).into(),
1976 partition: 0,
1977 sort_key: Some(SortKey::u32_asc(i)),
1978 data: format!("record_{}", i).as_bytes().to_vec(),
1979 };
1980 storage.put(record).await.expect("put should succeed");
1981 }
1982
1983 let results = storage
1984 .get_all(0)
1985 .await
1986 .expect("get_all should succeed");
1987
1988 assert_eq!(results.len(), 5, "get_all should return all 5 records");
1989
1990 for i in 0..5 {
1992 let expected_data = format!("record_{}", i).as_bytes().to_vec();
1993 let found = results.iter().any(|r| r.data == expected_data);
1994 assert!(found, "get_all should include record_{}", i);
1995 }
1996 }
1997
1998 pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
2000 clear_partitions(storage, &[0]).await.unwrap();
2001
2002 let with_key_1 = Record {
2004 pk: "get_all_with_1".into(),
2005 partition: 0,
2006 sort_key: Some(SortKey::u32_asc(1)),
2007 data: b"with_key_1".to_vec(),
2008 };
2009 let with_key_2 = Record {
2010 pk: "get_all_with_2".into(),
2011 partition: 0,
2012 sort_key: Some(SortKey::u32_asc(2)),
2013 data: b"with_key_2".to_vec(),
2014 };
2015
2016 let without_key_1 = Record {
2018 pk: "get_all_without_1".into(),
2019 partition: 0,
2020 sort_key: None,
2021 data: b"without_key_1".to_vec(),
2022 };
2023 let without_key_2 = Record {
2024 pk: "get_all_without_2".into(),
2025 partition: 0,
2026 sort_key: None,
2027 data: b"without_key_2".to_vec(),
2028 };
2029
2030 storage.put(with_key_1).await.expect("put should succeed");
2031 storage.put(without_key_1).await.expect("put should succeed");
2032 storage.put(with_key_2).await.expect("put should succeed");
2033 storage.put(without_key_2).await.expect("put should succeed");
2034
2035 let results = storage
2036 .get_all(0)
2037 .await
2038 .expect("get_all should succeed");
2039
2040 assert_eq!(results.len(), 4, "get_all should return all 4 records");
2041
2042 let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
2044 let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
2045 let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
2046 let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
2047
2048 assert!(has_with_1, "get_all should include with_key_1");
2049 assert!(has_with_2, "get_all should include with_key_2");
2050 assert!(has_without_1, "get_all should include without_key_1");
2051 assert!(has_without_2, "get_all should include without_key_2");
2052
2053 let query_results = storage
2055 .query_sorted(Query::new_full_range(0))
2056 .await
2057 .expect("query should succeed");
2058
2059 assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
2060 let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
2061 assert!(!query_has_without, "query should not include records without sort keys");
2062 }
2063
2064 pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
2066 clear_partitions(storage, &[0, 1]).await.unwrap();
2067
2068 for i in 0..3 {
2070 let record = Record {
2071 pk: format!("partition_0_{}", i).into(),
2072 partition: 0,
2073 sort_key: Some(SortKey::u32_asc(i)),
2074 data: format!("p0_record_{}", i).as_bytes().to_vec(),
2075 };
2076 storage.put(record).await.expect("put should succeed");
2077 }
2078
2079 for i in 0..2 {
2081 let record = Record {
2082 pk: format!("partition_1_{}", i).into(),
2083 partition: 1,
2084 sort_key: Some(SortKey::u32_asc(i)),
2085 data: format!("p1_record_{}", i).as_bytes().to_vec(),
2086 };
2087 storage.put(record).await.expect("put should succeed");
2088 }
2089
2090 let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2092 assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2093 assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2094
2095 let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2097 assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2098 assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2099 }
2100
2101 pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2103 clear_partitions(storage, &[0]).await.unwrap();
2104
2105 for i in 0..3u32 {
2106 let record = Record {
2107 pk: format!("get_all_del_{}", i).into(),
2108 partition: 0,
2109 sort_key: Some(SortKey::u32_asc(i)),
2110 data: format!("record_{}", i).as_bytes().to_vec(),
2111 };
2112 storage.put(record).await.expect("put should succeed");
2113 }
2114
2115 storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2116
2117 let results = storage.get_all(0).await.expect("get_all should succeed");
2118 assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2119
2120 let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2121 assert!(!has_deleted, "deleted record should not appear");
2122 }
2123}