1mod sort;
40
41#[cfg(feature = "filestore")]
42pub mod filestore;
43pub mod memory;
44pub use self::sort::SortKey;
45
46
47use std::ops::RangeBounds;
48
49use anyhow::Context;
50use bitcoin::{Amount, Transaction, Txid};
51use bitcoin::secp256k1::PublicKey;
52use bitcoin::hashes::Hash;
53#[cfg(feature = "onchain-bdk")]
54use bdk_core::Merge;
55#[cfg(feature = "onchain-bdk")]
56use bdk_wallet::ChangeSet;
57use chrono::{DateTime, Local};
58use lightning_invoice::Bolt11Invoice;
59use serde::{de::DeserializeOwned, Serialize};
60
61use ark::lightning::{Invoice, PaymentHash, Preimage};
62use ark::{Vtxo, VtxoId};
63use ark::vtxo::Full;
64use bitcoin_ext::BlockDelta;
65
66use crate::exit::ExitTxOrigin;
67use crate::movement::{
68 Movement, MovementId, MovementStatus, MovementSubsystem, PaymentMethod,
69};
70use crate::persist::BarkPersister;
71use crate::persist::models::{
72 LightningReceive, LightningSend, PendingBoard, PendingOffboard,
73 RoundStateId, SerdeExitChildTx, SerdeRoundState, SerdeVtxo, SerdeVtxoKey, StoredExit,
74 StoredRoundState, Unlocked,
75};
76use crate::round::RoundState;
77use crate::vtxo::{VtxoState, VtxoStateKind};
78use crate::{WalletProperties, WalletVtxo};
79
80
81pub mod partition {
82 pub const PROPERTIES: u8 = 0;
83 #[allow(unused)]
84 pub const BDK_CHANGESET: u8 = 1;
85 pub const VTXO: u8 = 2;
86 pub const PUBLIC_KEY: u8 = 3;
87 pub const PENDING_BOARD: u8 = 4;
88 pub const ROUND_STATE: u8 = 5;
89 pub const MOVEMENT: u8 = 6;
90 pub const LIGHTNING_SEND: u8 = 7;
91 pub const LIGHTNING_RECEIVE: u8 = 8;
92 pub const EXIT_VTXO: u8 = 9;
93 pub const EXIT_CHILD_TX: u8 = 10;
94 pub const MAILBOX_CHECKPOINT: u8 = 11;
95 pub const PENDING_OFFBOARD: u8 = 12;
96 pub const MOVEMENT_PAYMENT_METHOD: u8 = 13;
98
99 pub const LAST_IDS: u8 = u8::MAX;
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub struct Record {
105 pub partition: u8,
109
110 pub pk: Vec<u8>,
112
113 pub sort_key: Option<SortKey>,
122
123 pub data: Vec<u8>,
125}
126
127impl Record {
128 fn to_data<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
130 serde_json::from_slice(&self.data).map_err(Into::into)
131 }
132
133 fn from_data<T: Serialize>(
135 partition: u8,
136 pk: &[u8],
137 sort_key: Option<SortKey>,
138 data: &T,
139 ) -> anyhow::Result<Record> {
140 Ok(Record {
141 partition,
142 pk: pk.to_vec(),
143 sort_key,
144 data: serde_json::to_vec(data)?,
145 })
146 }
147}
148
149pub trait QueryRange: RangeBounds<SortKey> + Send {}
151
152impl<R: RangeBounds<SortKey> + Send> QueryRange for R {}
153
154#[derive(Debug, Clone)]
156pub struct Query<R: QueryRange> {
157 pub partition: u8,
159
160 pub range: R,
162
163 pub limit: Option<usize>,
165}
166
167impl<R: QueryRange> Query<R> {
168 pub fn new(partition: u8, range: R) -> Self {
170 Self {
171 partition,
172 range,
173 limit: None,
174 }
175 }
176
177 pub fn limit(mut self, limit: usize) -> Self {
182 self.limit = Some(limit);
183 self
184 }
185}
186
187impl Query<std::ops::RangeFull> {
188 pub fn new_full_range(partition: u8) -> Self {
189 Self::new(partition, ..)
190 }
191}
192
193fn serialize_payment_method(pm: &PaymentMethod) -> Vec<u8> {
194 let body = pm.value_string();
195
196 let mut buf = Vec::with_capacity(pm.type_str().len() + 1 + body.len());
197 buf.extend(pm.type_str().as_bytes().iter().copied());
198 buf.push(0xfe);
201 buf.extend(body.into_bytes());
202 buf
203}
204
205#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
252#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
253pub trait StorageAdaptor: Send + Sync + 'static {
254 async fn put(&mut self, record: Record) -> anyhow::Result<()>;
256
257 async fn get(&self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
261
262 async fn delete(&mut self, partition: u8, pk: &[u8]) -> anyhow::Result<Option<Record>>;
266
267 async fn query_sorted<R: QueryRange>(&self, query: Query<R>)
273 -> anyhow::Result<Vec<Record>>;
274
275 async fn get_all(&self, partition: u8) -> anyhow::Result<Vec<Record>>;
279
280 async fn incremental_id(&mut self, partition: u8) -> anyhow::Result<u32> {
282 let last_partition_id = self.get(partition::LAST_IDS, &[partition]).await?
283 .map(|r| r.to_data::<u32>()).unwrap_or(Ok(0))?;
284 let next_partition_id = last_partition_id + 1;
285
286 let record = Record::from_data(
287 partition::LAST_IDS,
288 &[partition],
289 None,
290 &next_partition_id,
291 )?;
292
293 self.put(record).await?;
294 Ok(next_partition_id)
295 }
296}
297
298async fn get_vtxo<S: StorageAdaptor>(adaptor: &S, id: VtxoId) -> anyhow::Result<Option<SerdeVtxo>> {
299 match adaptor.get(partition::VTXO, &id.to_bytes()).await? {
300 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?)),
301 None => Ok(None),
302 }
303}
304
305async fn get_check_vtxo_state<S: StorageAdaptor>(
306 adaptor: &S,
307 vtxo_id: VtxoId,
308 allowed_states: &[VtxoStateKind],
309) -> anyhow::Result<SerdeVtxo> {
310 let vtxo = get_vtxo(adaptor, vtxo_id).await?
311 .context("vtxo not found")?;
312
313 let current_state = vtxo.current_state().context("vtxo has no state")?;
314 if !allowed_states.contains(¤t_state.kind()) {
315 bail!("current state {:?} not in allowed states {:?}",
316 current_state.kind(), allowed_states
317 );
318 }
319
320 Ok(vtxo)
321}
322
323async fn update_vtxo_state_checked<S: StorageAdaptor>(
324 adaptor: &mut S,
325 vtxo_id: VtxoId,
326 new_state: VtxoState,
327 allowed_old_states: &[VtxoStateKind],
328) -> anyhow::Result<WalletVtxo> {
329 let mut serde_vtxo = get_check_vtxo_state(adaptor, vtxo_id, allowed_old_states).await?;
330
331 let sk = sort::vtxo_sort_key(
332 new_state.kind(), serde_vtxo.vtxo.expiry_height(), serde_vtxo.vtxo.amount()
333 );
334
335 serde_vtxo.states.push(new_state.clone());
336 let updated_record = Record::from_data(
337 partition::VTXO,
338 &vtxo_id.to_bytes(),
339 Some(sk),
340 &serde_vtxo,
341 )?;
342
343 adaptor.put(updated_record).await?;
344
345 Ok(WalletVtxo {
346 vtxo: serde_vtxo.vtxo,
347 state: new_state,
348 })
349}
350
351pub struct StorageAdaptorWrapper<S: StorageAdaptor> {
352 inner: tokio::sync::RwLock<S>,
353}
354
355impl<S: StorageAdaptor> StorageAdaptorWrapper<S> {
356 pub fn new(inner: S) -> Self {
357 Self {
358 inner: tokio::sync::RwLock::new(inner),
359 }
360 }
361}
362
363#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
365#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
366impl <S: StorageAdaptor> BarkPersister for StorageAdaptorWrapper<S> {
367 async fn init_wallet(&self, properties: &WalletProperties) -> anyhow::Result<()> {
368 let record = Record::from_data(
369 partition::PROPERTIES,
370 &[],
372 None,
373 properties,
374 )?;
375 self.inner.write().await.put(record).await
376 }
377
378 async fn read_properties(&self) -> anyhow::Result<Option<WalletProperties>> {
379 match self.inner.read().await.get(partition::PROPERTIES, &[]).await? {
380 Some(record) => Ok(Some(record.to_data()?)),
381 None => Ok(None),
382 }
383 }
384
385 async fn set_server_pubkey(&self, server_pubkey: PublicKey) -> anyhow::Result<()> {
386 let mut properties = match self.read_properties().await? {
387 Some(properties) => properties,
388 None => bail!("wallet not initialized"),
389 };
390
391 properties.server_pubkey = Some(server_pubkey);
392
393 let record = Record::from_data(partition::PROPERTIES, &[], None, &properties)?;
394 self.inner.write().await.put(record).await
395 }
396
397 #[cfg(feature = "onchain-bdk")]
398 async fn initialize_bdk_wallet(&self) -> anyhow::Result<ChangeSet> {
399 match self.inner.read().await.get(partition::BDK_CHANGESET, &[]).await? {
400 Some(record) => record.to_data(),
401 None => Ok(ChangeSet::default()),
402 }
403 }
404
405 #[cfg(feature = "onchain-bdk")]
406 async fn store_bdk_wallet_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<()> {
407 let mut current = self.initialize_bdk_wallet().await?;
408 current.merge(changeset.clone());
409
410 let record = Record::from_data(
411 partition::BDK_CHANGESET,
412 &[],
414 None,
415 ¤t,
416 )?;
417 self.inner.write().await.put(record).await
418 }
419
420 async fn create_new_movement(
421 &self,
422 status: MovementStatus,
423 subsystem: &MovementSubsystem,
424 time: DateTime<Local>,
425 ) -> anyhow::Result<MovementId> {
426 let mut lock = self.inner.write().await;
427
428 let id = MovementId(lock.incremental_id(partition::MOVEMENT).await?);
429 let movement = Movement::new(id, status, subsystem, time);
430
431 let record = Record::from_data(
432 partition::MOVEMENT,
433 &id.to_bytes(),
434 Some(sort::movement_sort_key(&time)),
435 &movement,
436 )?;
437 lock.put(record).await?;
438
439 Ok(id)
440 }
441
442 async fn update_movement(&self, movement: &Movement) -> anyhow::Result<()> {
443 let mut guard = self.inner.write().await;
444
445 let record = Record::from_data(
446 partition::MOVEMENT,
447 &movement.id.to_bytes(),
448 Some(sort::movement_sort_key(&movement.time.created_at)),
449 movement,
450 )?;
451 guard.put(record).await?;
452
453 let sent = movement.sent_to.iter().map(|d| &d.destination);
455 let rcvd = movement.received_on.iter().map(|d| &d.destination);
456 for pm in sent.chain(rcvd) {
457 let pm_bytes = serialize_payment_method(pm);
458 let primary_key = {
459 let mut buf = Vec::with_capacity(pm_bytes.len() + 4);
461 buf.extend(pm_bytes.iter().copied());
462 buf.extend(movement.id.to_bytes());
463 buf
464 };
465 let record = Record::from_data(
466 partition::MOVEMENT_PAYMENT_METHOD,
467 &primary_key,
468 Some(SortKey::from_bytes(pm_bytes)),
469 &movement.id.0,
470 )?;
471 guard.put(record).await?;
472 }
473
474 Ok(())
475 }
476
477 async fn get_movement_by_id(&self, movement_id: MovementId) -> anyhow::Result<Movement> {
478 self.inner.read().await.get(partition::MOVEMENT, &movement_id.to_bytes())
479 .await?
480 .context("movement not found")?
481 .to_data()
482 }
483
484 async fn get_all_movements(&self) -> anyhow::Result<Vec<Movement>> {
485 let records = self.inner.read().await
486 .query_sorted(Query::new_full_range(partition::MOVEMENT)).await?;
487 records.into_iter().map(|r| r.to_data()).collect()
488 }
489
490 async fn get_movements_by_payment_method(
491 &self,
492 payment_method: &PaymentMethod,
493 ) -> anyhow::Result<Vec<Movement>> {
494 let pm_bytes = serialize_payment_method(payment_method);
495 let sort_key = SortKey::from_bytes(pm_bytes);
496
497 let guard = self.inner.read().await;
498 let idx_recs = guard.query_sorted(Query::new(
499 partition::MOVEMENT_PAYMENT_METHOD,
500 sort_key.clone()..=sort_key,
501 )).await?;
502
503 let mut ret = Vec::with_capacity(idx_recs.len());
504 for idx_rec in idx_recs {
505 let id = MovementId::new(idx_rec.to_data::<u32>()
506 .context("corrupt db: movement payment method index value")?);
507
508 ret.push(
509 guard.get(partition::MOVEMENT, &id.to_bytes()).await?
510 .context("corrupt db: movement payment method entry for unknown movement")?
511 .to_data()
512 .context("corrupt db: invalid movement record")?
513 );
514 }
515 Ok(ret)
516 }
517
518 async fn store_pending_board(
519 &self,
520 vtxo: &Vtxo<Full>,
521 funding_tx: &Transaction,
522 movement_id: MovementId,
523 ) -> anyhow::Result<()> {
524 let pending_board = PendingBoard {
525 vtxos: vec![vtxo.id()],
526 amount: vtxo.amount(),
527 funding_tx: funding_tx.clone(),
528 movement_id,
529 };
530
531 let record = Record::from_data(
532 partition::PENDING_BOARD,
533 &vtxo.id().to_bytes(),
534 None,
535 &pending_board,
536 )?;
537
538 self.inner.write().await.put(record).await
539 }
540
541 async fn remove_pending_board(&self, vtxo_id: &VtxoId) -> anyhow::Result<()> {
542 self.inner.write().await.delete(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await?;
543 Ok(())
544 }
545
546 async fn get_all_pending_board_ids(&self) -> anyhow::Result<Vec<VtxoId>> {
547 let records = self.inner.read().await.get_all(partition::PENDING_BOARD).await?;
548 records
549 .into_iter()
550 .map(|r| {
551 let board = r.to_data::<PendingBoard>()?;
552 Ok(board.vtxos.into_iter().next().context("empty vtxos")?)
553 })
554 .collect()
555 }
556
557 async fn get_pending_board_by_vtxo_id(
558 &self,
559 vtxo_id: VtxoId,
560 ) -> anyhow::Result<Option<PendingBoard>> {
561 match self.inner.read().await.get(partition::PENDING_BOARD, &vtxo_id.to_bytes()).await? {
562 Some(record) => Ok(Some(record.to_data()?)),
563 None => Ok(None),
564 }
565 }
566
567 async fn store_round_state_lock_vtxos(
568 &self,
569 round_state: &RoundState,
570 ) -> anyhow::Result<RoundStateId> {
571 let mut lock = self.inner.write().await;
572
573 let id = RoundStateId(lock.incremental_id(partition::ROUND_STATE).await?);
574
575 let allowed_states = &[VtxoStateKind::Spendable];
576
577 for vtxo in round_state.participation().inputs.iter() {
579 get_check_vtxo_state(&mut *lock, vtxo.id(), allowed_states).await?;
580 }
581
582 for vtxo in round_state.participation().inputs.iter() {
583 update_vtxo_state_checked(
584 &mut *lock,
585 vtxo.id(),
586 VtxoState::Locked { movement_id: round_state.movement_id },
587 allowed_states,
588 ).await?;
589 }
590
591 let serde_state = SerdeRoundState::from(round_state);
592 let record = Record::from_data(
593 partition::ROUND_STATE,
594 &id.to_bytes(),
595 Some(sort::SortKey::u32_asc(id.0)),
596 &serde_state,
597 )?;
598 lock.put(record).await?;
599
600 Ok(id)
601 }
602
603 async fn update_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
604 let serde_state = SerdeRoundState::from(round_state.state());
605 let record = Record::from_data(
606 partition::ROUND_STATE,
607 &round_state.id().to_bytes(),
608 Some(sort::SortKey::u32_asc(round_state.id().0)),
609 &serde_state,
610 )?;
611 self.inner.write().await.put(record).await
612 }
613
614 async fn remove_round_state(&self, round_state: &StoredRoundState) -> anyhow::Result<()> {
615 self.inner.write().await
616 .delete(partition::ROUND_STATE, &round_state.id().to_bytes()).await?;
617 Ok(())
618 }
619
620 async fn get_round_state_by_id(&self, _id: RoundStateId) -> anyhow::Result<Option<StoredRoundState<Unlocked>>> {
621 let record = self.inner.read().await
622 .get(partition::ROUND_STATE, &_id.to_bytes()).await?;
623 match record {
624 Some(r) => {
625 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
626 let id = RoundStateId(u32::from_be_bytes(pk_slice));
627 let state = r.to_data::<SerdeRoundState>()?.into();
628 Ok(Some(StoredRoundState::new(id, state)))
629 },
630 None => Ok(None),
631 }
632 }
633
634 async fn get_pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
635 let records = self.inner.read().await
636 .get_all(partition::ROUND_STATE).await?;
637 records.into_iter()
638 .map(|r| {
639 let pk_slice: [u8; 4] = r.pk[..4].try_into().expect("4 bytes shouldn't fail");
640 Ok(RoundStateId(u32::from_be_bytes(pk_slice)))
641 })
642 .collect()
643 }
644
645 async fn store_vtxos(&self, vtxos: &[(&Vtxo<Full>, &VtxoState)]) -> anyhow::Result<()> {
646 let mut lock = self.inner.write().await;
647
648 for (vtxo, state) in vtxos {
649 let serde_vtxo = SerdeVtxo {
650 vtxo: (*vtxo).clone(),
651 states: vec![(*state).clone()],
652 };
653
654 let sk = sort::vtxo_sort_key(
655 state.kind(), vtxo.expiry_height(), vtxo.amount(),
656 );
657 let record = Record::from_data(
658 partition::VTXO,
659 &vtxo.id().to_bytes(),
660 Some(sk),
661 &serde_vtxo,
662 )?;
663 lock.put(record).await?;
664 }
665 Ok(())
666 }
667
668 async fn get_wallet_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<WalletVtxo>> {
669 let lock = self.inner.read().await;
670 match get_vtxo(&*lock, id).await? {
671 Some(vtxo) => Ok(Some(WalletVtxo {
672 state: vtxo.current_state().context("vtxo has no state")?.clone(),
673 vtxo: vtxo.vtxo,
674 })),
675 None => Ok(None),
676 }
677 }
678
679 async fn get_all_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
680 let records = self.inner.read().await
681 .query_sorted(Query::new_full_range(partition::VTXO)).await?;
682
683 records
684 .into_iter()
685 .map(|r| {
686 let serde_vtxo = r.to_data::<SerdeVtxo>()?;
687 let state = serde_vtxo
688 .current_state()
689 .cloned()
690 .context("vtxo has no state")?;
691 Ok(WalletVtxo {
692 vtxo: serde_vtxo.vtxo,
693 state,
694 })
695 })
696 .collect()
697 }
698
699 async fn get_vtxos_by_state(
700 &self,
701 states: &[VtxoStateKind],
702 ) -> anyhow::Result<Vec<WalletVtxo>> {
703 let lock = self.inner.read().await;
704
705 let range = |state: VtxoStateKind| {
706 let start = sort::vtxo_sort_key(state, u32::MIN, Amount::ZERO);
707 let end = sort::vtxo_sort_key(state, u32::MAX, Amount::MAX);
708 (start, end)
709 };
710
711 let mut records = Vec::new();
712 for state in states {
713 let (start, end) = range(*state);
714 let query = Query::new(partition::VTXO, start..=end);
715
716 for record in lock.query_sorted(query).await? {
717 let serde_vtxo = record.to_data::<SerdeVtxo>()?;
718 let current_state = serde_vtxo.current_state()
719 .context("vtxo has no current state")?.clone();
720 debug_assert_eq!(current_state.kind(), *state);
721 records.push(WalletVtxo {
722 vtxo: serde_vtxo.vtxo,
723 state: current_state,
724 });
725 }
726 }
727
728 Ok(records)
729 }
730
731 async fn remove_vtxo(&self, id: VtxoId) -> anyhow::Result<Option<Vtxo<Full>>> {
732 match self.inner.write().await.delete(partition::VTXO, &id.to_bytes()).await? {
733 Some(record) => Ok(Some(record.to_data::<SerdeVtxo>()?.vtxo)),
734 None => Ok(None),
735 }
736 }
737
738 async fn has_spent_vtxo(&self, id: VtxoId) -> anyhow::Result<bool> {
739 match self.get_wallet_vtxo(id).await? {
740 Some(vtxo) => Ok(vtxo.state.kind() == VtxoStateKind::Spent),
741 None => Ok(false),
742 }
743 }
744
745 async fn update_vtxo_state_checked(
746 &self,
747 vtxo_id: VtxoId,
748 new_state: VtxoState,
749 allowed_old_states: &[VtxoStateKind],
750 ) -> anyhow::Result<WalletVtxo> {
751 let mut lock = self.inner.write().await;
752 update_vtxo_state_checked(&mut *lock, vtxo_id, new_state, allowed_old_states).await
753 }
754
755 async fn store_vtxo_key(&self, index: u32, public_key: PublicKey) -> anyhow::Result<()> {
756 let vtxo_key = SerdeVtxoKey { index, public_key };
757 let record = Record::from_data(
758 partition::PUBLIC_KEY,
759 &public_key.serialize()[..],
760 Some(sort::SortKey::u64_desc(index as u64)),
761 &vtxo_key,
762 )?;
763 self.inner.write().await.put(record).await
764 }
765
766 async fn get_last_vtxo_key_index(&self) -> anyhow::Result<Option<u32>> {
767 let query = Query::new_full_range(partition::PUBLIC_KEY).limit(1);
769 let records = self.inner.read().await.query_sorted(query).await?;
770
771 match records.into_iter().next() {
772 Some(record) => {
773 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
774 Ok(Some(vtxo_key.index))
775 }
776 None => Ok(None),
777 }
778 }
779
780 async fn get_public_key_idx(&self, public_key: &PublicKey) -> anyhow::Result<Option<u32>> {
781 match self.inner.read().await
782 .get(partition::PUBLIC_KEY, &public_key.serialize()[..]).await?
783 {
784 Some(record) => {
785 let vtxo_key = record.to_data::<SerdeVtxoKey>()?;
786 Ok(Some(vtxo_key.index))
787 }
788 None => Ok(None),
789 }
790 }
791
792 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
793 match self.inner.read().await
794 .get(partition::MAILBOX_CHECKPOINT, &[]).await?
795 {
796 Some(record) => Ok(record.to_data::<u64>()?),
797 None => Ok(0),
798 }
799 }
800
801 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
802 let mut lock = self.inner.write().await;
803 let record = Record::from_data(
804 partition::MAILBOX_CHECKPOINT,
805 &[],
806 None,
807 &checkpoint,
808 )?;
809 lock.put(record).await?;
810 Ok(())
811 }
812
813 async fn store_new_pending_lightning_send(
814 &self,
815 invoice: &Invoice,
816 amount: Amount,
817 fee: Amount,
818 vtxo_ids: &[VtxoId],
819 movement_id: MovementId,
820 ) -> anyhow::Result<LightningSend> {
821 let mut lock = self.inner.write().await;
822 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
823 for vtxo_id in vtxo_ids {
824 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
825 .context("vtxo not found")?;
826 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
827 }
828
829 let lightning_send = LightningSend {
830 invoice: invoice.clone(),
831 amount,
832 fee,
833 htlc_vtxos,
834 preimage: None,
835 movement_id,
836 finished_at: None,
837 };
838
839 let record = Record::from_data(
840 partition::LIGHTNING_SEND,
841 &invoice.payment_hash().to_byte_array(),
842 None,
843 &lightning_send,
844 )?;
845
846 lock.put(record).await?;
847
848 Ok(lightning_send)
849 }
850
851 async fn get_all_pending_lightning_send(&self) -> anyhow::Result<Vec<LightningSend>> {
852 let records = self.inner.read().await
853 .get_all(partition::LIGHTNING_SEND).await?;
854 records
855 .into_iter()
856 .filter_map(|r| {
857 let send = r.to_data::<LightningSend>().ok()?;
858 if send.finished_at.is_none() {
859 Some(Ok(send))
860 } else {
861 None
862 }
863 })
864 .collect()
865 }
866
867 async fn finish_lightning_send(
868 &self,
869 payment_hash: PaymentHash,
870 preimage: Option<Preimage>,
871 ) -> anyhow::Result<()> {
872 let mut lock = self.inner.write().await;
873
874 let pk = payment_hash.to_byte_array();
875 let record = lock
876 .get(partition::LIGHTNING_SEND, &pk).await?.context("lightning send not found")?;
877 let mut lightning_send: LightningSend = record.to_data()?;
878
879 lightning_send.preimage = preimage;
880 lightning_send.finished_at = Some(Local::now());
881
882 let updated_record = Record::from_data(
883 partition::LIGHTNING_SEND,
884 &pk,
885 None,
886 &lightning_send,
887 )?;
888 lock.put(updated_record).await?;
889
890 Ok(())
891 }
892
893 async fn remove_lightning_send(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
894 self.inner.write().await.delete(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?;
895 Ok(())
896 }
897
898 async fn get_lightning_send(
899 &self,
900 payment_hash: PaymentHash,
901 ) -> anyhow::Result<Option<LightningSend>> {
902 match self.inner.read().await
903 .get(partition::LIGHTNING_SEND, &payment_hash.to_byte_array()).await?
904 {
905 Some(record) => Ok(Some(record.to_data()?)),
906 None => Ok(None),
907 }
908 }
909
910 async fn store_lightning_receive(
911 &self,
912 payment_hash: PaymentHash,
913 preimage: Preimage,
914 invoice: &Bolt11Invoice,
915 htlc_recv_cltv_delta: BlockDelta,
916 ) -> anyhow::Result<()> {
917 let lightning_receive = LightningReceive {
918 payment_hash,
919 payment_preimage: preimage,
920 invoice: invoice.clone(),
921 htlc_recv_cltv_delta,
922 htlc_vtxos: vec![],
923 movement_id: None,
924 finished_at: None,
925 preimage_revealed_at: None,
926 };
927
928 let record = Record::from_data(
929 partition::LIGHTNING_RECEIVE,
930 &payment_hash.to_byte_array(),
931 None,
932 &lightning_receive,
933 )?;
934 self.inner.write().await.put(record).await
935 }
936
937 async fn get_all_pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
938 let records = self.inner.read().await
939 .get_all(partition::LIGHTNING_RECEIVE).await?;
940 records
941 .into_iter()
942 .filter_map(|r| {
943 let receive = r.to_data::<LightningReceive>().ok()?;
944 if receive.finished_at.is_none() {
945 Some(Ok(receive))
946 } else {
947 None
948 }
949 })
950 .collect()
951 }
952
953 async fn set_preimage_revealed(&self, payment_hash: PaymentHash) -> anyhow::Result<()> {
954 let mut lock = self.inner.write().await;
955
956 let pk = payment_hash.to_byte_array();
957 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
958 .context("lightning receive not found")?;
959 let mut lightning_receive: LightningReceive = record.to_data()?;
960
961 lightning_receive.preimage_revealed_at = Some(Local::now());
962
963 let updated_record = Record::from_data(
964 partition::LIGHTNING_RECEIVE,
965 &pk,
966 None,
967 &lightning_receive,
968 )?;
969 lock.put(updated_record).await
970 }
971
972 async fn update_lightning_receive(
973 &self,
974 payment_hash: PaymentHash,
975 vtxo_ids: &[VtxoId],
976 movement_id: MovementId,
977 ) -> anyhow::Result<()> {
978 let mut lock = self.inner.write().await;
979 let pk = payment_hash.to_byte_array();
980 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
981 .context("lightning receive not found")?;
982 let mut lightning_receive: LightningReceive = record.to_data()?;
983
984 let mut htlc_vtxos = Vec::with_capacity(vtxo_ids.len());
985 for vtxo_id in vtxo_ids {
986 let vtxo = get_vtxo(&*lock, *vtxo_id).await?
987 .context("vtxo not found")?;
988 htlc_vtxos.push(vtxo.to_wallet_vtxo()?);
989 }
990
991 lightning_receive.htlc_vtxos = htlc_vtxos;
992 lightning_receive.movement_id = Some(movement_id);
993
994 let updated_record = Record::from_data(
995 partition::LIGHTNING_RECEIVE,
996 &pk,
997 None,
998 &lightning_receive,
999 )?;
1000 lock.put(updated_record).await
1001 }
1002
1003 async fn fetch_lightning_receive_by_payment_hash(
1004 &self,
1005 payment_hash: PaymentHash,
1006 ) -> anyhow::Result<Option<LightningReceive>> {
1007 match self.inner.read().await
1008 .get(partition::LIGHTNING_RECEIVE, &payment_hash.to_byte_array()).await?
1009 {
1010 Some(record) => Ok(Some(record.to_data()?)),
1011 None => Ok(None),
1012 }
1013 }
1014
1015 async fn finish_pending_lightning_receive(
1016 &self,
1017 payment_hash: PaymentHash,
1018 ) -> anyhow::Result<()> {
1019 let mut lock = self.inner.write().await;
1020 let pk = payment_hash.to_byte_array();
1021 let record = lock.get(partition::LIGHTNING_RECEIVE, &pk).await?
1022 .context("lightning receive not found")?;
1023 let mut lightning_receive: LightningReceive = record.to_data()?;
1024
1025 lightning_receive.finished_at = Some(Local::now());
1026
1027 let updated_record = Record::from_data(
1028 partition::LIGHTNING_RECEIVE,
1029 &pk,
1030 None,
1031 &lightning_receive,
1032 )?;
1033 lock.put(updated_record).await
1034 }
1035
1036 async fn store_pending_offboard(&self, pending: &PendingOffboard) -> anyhow::Result<()> {
1037 let record = Record::from_data(
1038 partition::PENDING_OFFBOARD,
1039 &pending.movement_id.to_bytes(),
1040 None,
1041 pending,
1042 )?;
1043 self.inner.write().await.put(record).await
1044 }
1045
1046 async fn get_pending_offboards(&self) -> anyhow::Result<Vec<PendingOffboard>> {
1047 let records = self.inner.read().await
1048 .get_all(partition::PENDING_OFFBOARD).await?;
1049 records.into_iter().map(|r| r.to_data()).collect()
1050 }
1051
1052 async fn remove_pending_offboard(&self, movement_id: MovementId) -> anyhow::Result<()> {
1053 self.inner.write().await
1054 .delete(partition::PENDING_OFFBOARD, &movement_id.to_bytes()).await?;
1055 Ok(())
1056 }
1057
1058 async fn store_exit_vtxo_entry(&self, exit: &StoredExit) -> anyhow::Result<()> {
1059 let record = Record::from_data(
1060 partition::EXIT_VTXO,
1061 &exit.vtxo_id.to_bytes(),
1062 None,
1063 exit,
1064 )?;
1065 self.inner.write().await.put(record).await
1066 }
1067
1068 async fn remove_exit_vtxo_entry(&self, id: &VtxoId) -> anyhow::Result<()> {
1069 self.inner.write().await.delete(partition::EXIT_VTXO, &id.to_bytes()).await?;
1070 Ok(())
1071 }
1072
1073 async fn get_exit_vtxo_entries(&self) -> anyhow::Result<Vec<StoredExit>> {
1074 let records = self.inner.read().await.get_all(partition::EXIT_VTXO).await?;
1075 records.into_iter().map(|r| r.to_data()).collect()
1076 }
1077
1078 async fn store_exit_child_tx(
1079 &self,
1080 exit_txid: Txid,
1081 child_tx: &Transaction,
1082 origin: ExitTxOrigin,
1083 ) -> anyhow::Result<()> {
1084 let exit_child = SerdeExitChildTx {
1085 child_tx: child_tx.clone(),
1086 origin,
1087 };
1088 let record = Record::from_data(
1089 partition::EXIT_CHILD_TX,
1090 &exit_txid.to_byte_array(),
1091 None,
1092 &exit_child,
1093 )?;
1094 self.inner.write().await.put(record).await
1095 }
1096
1097 async fn get_exit_child_tx(
1098 &self,
1099 exit_txid: Txid,
1100 ) -> anyhow::Result<Option<(Transaction, ExitTxOrigin)>> {
1101 match self.inner.read().await
1102 .get(partition::EXIT_CHILD_TX, &exit_txid.to_byte_array()).await?
1103 {
1104 Some(record) => {
1105 let exit_child = record.to_data::<SerdeExitChildTx>()?;
1106 Ok(Some((exit_child.child_tx, exit_child.origin)))
1107 }
1108 None => Ok(None),
1109 }
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use super::*;
1116
1117 #[test]
1118 fn storage_query_builder() {
1119 let query = Query::new_full_range(0).limit(10);
1120
1121 assert_eq!(query.partition, 0);
1122 assert_eq!(query.limit, Some(10));
1123 assert_eq!(query.range, ..);
1124 }
1125}
1126
1127#[cfg(test)]
1142pub mod test_suite {
1143 use super::*;
1144 use super::partition::LAST_IDS;
1145 use super::sort::SortKey;
1146
1147 async fn clear_partitions<S: StorageAdaptor>(storage: &mut S, partitions: &[u8]) -> anyhow::Result<()> {
1148 for partition in partitions {
1149 let records = storage.get_all(*partition).await?;
1150 for record in records {
1151 storage.delete(record.partition, &record.pk).await?;
1152 }
1153 }
1154 Ok(())
1155 }
1156
1157 pub async fn run_all<S: StorageAdaptor>(storage: &mut S) {
1159 test_put_insert(storage).await;
1161 test_put_upsert(storage).await;
1162 test_put_with_sort_key(storage).await;
1163 test_put_without_sort_key(storage).await;
1164 test_put_multiple_partitions(storage).await;
1165
1166 test_get_existing(storage).await;
1168 test_get_after_update(storage).await;
1169
1170 test_delete_existing(storage).await;
1172 test_delete_nonexistent(storage).await;
1173 test_delete_idempotent(storage).await;
1174
1175 test_query_empty_partition(storage).await;
1177 test_query_returns_partition_records(storage).await;
1178 test_query_ordering(storage).await;
1179 test_query_with_limit(storage).await;
1180 test_query_null_sort_key_excluded(storage).await;
1181 test_query_partition_isolation(storage).await;
1182 test_query_range(storage).await;
1183 test_query_exclusive_end_range(storage).await;
1184 test_query_full_range_limit_one(storage).await;
1185
1186 test_get_all_empty_partition(storage).await;
1188 test_get_all_returns_all_records(storage).await;
1189 test_get_all_includes_records_without_sort_key(storage).await;
1190 test_get_all_partition_isolation(storage).await;
1191 test_get_all_after_delete(storage).await;
1192
1193 test_incremental_id_starts_at_one(storage).await;
1195 test_incremental_id_increments(storage).await;
1196 test_incremental_id_partition_isolation(storage).await;
1197 test_incremental_id_persists_across_operations(storage).await;
1198 }
1199
1200 pub async fn test_put_insert<S: StorageAdaptor>(storage: &mut S) {
1202 let record = Record {
1203 pk: "put_insert_1".into(),
1204 partition: 0,
1205 sort_key: None,
1206 data: b"test data".to_vec(),
1207 };
1208
1209 storage.put(record).await.expect("put should succeed");
1210
1211 let retrieved = storage
1212 .get(0, b"put_insert_1")
1213 .await
1214 .expect("get should succeed")
1215 .expect("record should exist");
1216
1217 assert_eq!(retrieved.pk, b"put_insert_1");
1218 assert_eq!(retrieved.partition, 0);
1219 assert_eq!(retrieved.data, b"test data");
1220 }
1221
1222 pub async fn test_put_upsert<S: StorageAdaptor>(storage: &mut S) {
1224 let record1 = Record {
1225 pk: b"put_upsert_1".into(),
1226 partition: 0,
1227 sort_key: None,
1228 data: b"original".to_vec(),
1229 };
1230 storage.put(record1).await.expect("first put should succeed");
1231
1232 let record2 = Record {
1233 pk: "put_upsert_1".into(),
1234 partition: 0,
1235 sort_key: None,
1236 data: b"updated".to_vec(),
1237 };
1238 storage
1239 .put(record2)
1240 .await
1241 .expect("second put should succeed");
1242
1243 let retrieved = storage
1244 .get(0, b"put_upsert_1")
1245 .await
1246 .expect("get should succeed")
1247 .expect("record should exist");
1248
1249 assert_eq!(retrieved.data, b"updated", "data should be updated");
1250 }
1251
1252 pub async fn test_put_with_sort_key<S: StorageAdaptor>(storage: &mut S) {
1254 let sort_key = SortKey::u32_asc(42);
1255 let record = Record {
1256 pk: b"put_sort_key_1".into(),
1257 partition: 0,
1258 sort_key: Some(sort_key.clone()),
1259 data: b"with sort key".to_vec(),
1260 };
1261
1262 storage.put(record).await.expect("put should succeed");
1263
1264 let retrieved = storage
1265 .get(0, b"put_sort_key_1")
1266 .await
1267 .expect("get should succeed")
1268 .expect("record should exist");
1269
1270 assert_eq!(retrieved.sort_key, Some(sort_key));
1271 }
1272
1273 pub async fn test_put_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1275 let record = Record {
1276 pk: b"put_no_sort_key_1".into(),
1277 partition: 0,
1278 sort_key: None,
1279 data: b"no sort key".to_vec(),
1280 };
1281
1282 storage.put(record).await.expect("put should succeed");
1283
1284 let retrieved = storage
1285 .get(0, b"put_no_sort_key_1")
1286 .await
1287 .expect("get should succeed")
1288 .expect("record should exist");
1289
1290 assert!(retrieved.sort_key.is_none());
1291 }
1292
1293 pub async fn test_put_multiple_partitions<S: StorageAdaptor>(storage: &mut S) {
1295 let record_a = Record {
1296 pk: "put_multi_a".into(),
1297 partition: 0,
1298 sort_key: None,
1299 data: b"in partition a".to_vec(),
1300 };
1301 let record_b = Record {
1302 pk: "put_multi_b".into(),
1303 partition: 1,
1304 sort_key: None,
1305 data: b"in partition b".to_vec(),
1306 };
1307
1308 storage.put(record_a).await.expect("put a should succeed");
1309 storage.put(record_b).await.expect("put b should succeed");
1310
1311 let retrieved_a = storage
1312 .get(0, b"put_multi_a")
1313 .await
1314 .expect("get should succeed")
1315 .expect("record a should exist");
1316 let retrieved_b = storage
1317 .get(1, b"put_multi_b")
1318 .await
1319 .expect("get should succeed")
1320 .expect("record b should exist");
1321
1322 assert_eq!(retrieved_a.partition, 0);
1323 assert_eq!(retrieved_b.partition, 1);
1324 }
1325
1326 pub async fn test_get_existing<S: StorageAdaptor>(storage: &mut S) {
1328 let record = Record {
1329 pk: b"get_existing_1".into(),
1330 partition: 0,
1331 sort_key: Some(SortKey::u32_asc(100)),
1332 data: b"test".to_vec(),
1333 };
1334 storage.put(record).await.expect("put should succeed");
1335
1336 let retrieved = storage
1337 .get(0, b"get_existing_1")
1338 .await
1339 .expect("get should succeed");
1340
1341 assert!(retrieved.is_some());
1342 let retrieved = retrieved.unwrap();
1343 assert_eq!(retrieved.pk, b"get_existing_1");
1344 assert_eq!(retrieved.partition, 0);
1345 assert_eq!(retrieved.data, b"test");
1346
1347 assert!(storage.get(0, b"get_existing_1_").await.unwrap().is_none());
1349 assert!(storage.get(0, b"get_existing_").await.unwrap().is_none());
1351
1352 assert!(storage.get(0, b"get_nonexistent_does_not_exist").await.unwrap().is_none());
1354 }
1355
1356 pub async fn test_get_after_update<S: StorageAdaptor>(storage: &mut S) {
1358 let record1 = Record {
1359 pk: b"get_after_update_1".into(),
1360 partition: 0,
1361 sort_key: None,
1362 data: b"version1".to_vec(),
1363 };
1364 storage.put(record1).await.expect("put should succeed");
1365
1366 let record2 = Record {
1367 pk: b"get_after_update_1".into(),
1368 partition: 0,
1369 sort_key: None,
1370 data: b"version2".to_vec(),
1371 };
1372 storage.put(record2).await.expect("put should succeed");
1373
1374 let retrieved = storage
1375 .get(0, b"get_after_update_1")
1376 .await
1377 .expect("get should succeed")
1378 .expect("record should exist");
1379
1380 assert_eq!(retrieved.data, b"version2");
1381 }
1382
1383 pub async fn test_delete_existing<S: StorageAdaptor>(storage: &mut S) {
1385 let record = Record {
1386 pk: b"delete_existing_1".into(),
1387 partition: 0,
1388 sort_key: None,
1389 data: b"to delete".to_vec(),
1390 };
1391 storage.put(record.clone()).await.expect("put should succeed");
1392
1393 let deleted_record = storage
1394 .delete(0, b"delete_existing_1")
1395 .await
1396 .expect("delete should succeed");
1397
1398 assert_eq!(deleted_record, Some(record));
1399
1400 let retrieved = storage
1401 .get(0, b"delete_existing_1")
1402 .await
1403 .expect("get should succeed");
1404 assert!(retrieved.is_none(), "record should no longer exist");
1405 }
1406
1407 pub async fn test_delete_nonexistent<S: StorageAdaptor>(storage: &mut S) {
1409 let deleted_record = storage
1410 .delete(0, b"delete_nonexistent_does_not_exist")
1411 .await
1412 .expect("delete should succeed");
1413
1414 assert!(
1415 deleted_record.is_none(),
1416 "delete should return None for non-existent record"
1417 );
1418 }
1419
1420 pub async fn test_delete_idempotent<S: StorageAdaptor>(storage: &mut S) {
1422 let record = Record {
1423 pk: b"delete_idempotent_1".into(),
1424 partition: 0,
1425 sort_key: None,
1426 data: b"delete twice".to_vec(),
1427 };
1428 storage.put(record.clone()).await.expect("put should succeed");
1429
1430 let first_delete = storage
1431 .delete(0, b"delete_idempotent_1")
1432 .await
1433 .expect("first delete should succeed");
1434 let second_delete = storage
1435 .delete(0, b"delete_idempotent_1")
1436 .await
1437 .expect("second delete should succeed");
1438
1439 assert_eq!(first_delete, Some(record), "first delete should return the record");
1440 assert_eq!(second_delete, None, "second delete should return None");
1441 }
1442
1443 pub async fn test_query_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1445 clear_partitions(storage, &[0]).await.unwrap();
1446 let results = storage
1447 .query_sorted(Query::new_full_range(0))
1448 .await
1449 .expect("query should succeed");
1450
1451 assert!(results.is_empty());
1452 }
1453
1454 pub async fn test_query_returns_partition_records<S: StorageAdaptor>(storage: &mut S) {
1456 clear_partitions(storage, &[0]).await.unwrap();
1457 for i in 0..3 {
1458 let record = Record {
1459 pk: format!("query_partition_{}", i).into(),
1460 partition: 0,
1461 sort_key: Some(SortKey::u32_asc(i)),
1462 data: format!("record_{}", i).as_bytes().to_vec(),
1463 };
1464 storage.put(record).await.expect("put should succeed");
1465 }
1466
1467 let results = storage
1468 .query_sorted(Query::new_full_range(0))
1469 .await
1470 .expect("query should succeed");
1471
1472 assert_eq!(results.len(), 3);
1473 }
1474
1475 pub async fn test_query_ordering<S: StorageAdaptor>(storage: &mut S) {
1477 clear_partitions(storage, &[0]).await.unwrap();
1478 for i in [5, 2, 8, 1, 9] {
1480 let record = Record {
1481 pk: format!("query_asc_{}", i).into(),
1482 partition: 0,
1483 sort_key: Some(SortKey::u32_asc(i)),
1484 data: format!("record_{}", i).as_bytes().to_vec(),
1485 };
1486 storage.put(record).await.expect("put should succeed");
1487 }
1488
1489 let results = storage
1490 .query_sorted(Query::new_full_range(0))
1491 .await
1492 .expect("query should succeed");
1493
1494 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1495 assert_eq!(
1496 values,
1497 vec![b"record_1".to_vec(), b"record_2".to_vec(), b"record_5".to_vec(), b"record_8".to_vec(), b"record_9".to_vec()],
1498 "should be in ascending order"
1499 );
1500 }
1501
1502 pub async fn test_query_with_limit<S: StorageAdaptor>(storage: &mut S) {
1504 clear_partitions(storage, &[0]).await.unwrap();
1505 for i in 0..10 {
1506 let record = Record {
1507 pk: format!("query_limit_{}", i).into(),
1508 partition: 0,
1509 sort_key: Some(SortKey::u32_asc(i)),
1510 data: format!("record_{}", i).as_bytes().to_vec(),
1511 };
1512 storage.put(record).await.expect("put should succeed");
1513 }
1514
1515 let results = storage
1516 .query_sorted(Query::new_full_range(0).limit(3))
1517 .await
1518 .expect("query should succeed");
1519
1520 assert_eq!(results.len(), 3);
1521 let values = results.iter().map(|r| r.data.clone()).collect::<Vec<_>>();
1522 assert_eq!(
1523 values,
1524 vec![b"record_0".to_vec(), b"record_1".to_vec(), b"record_2".to_vec()],
1525 "should return first 3 records"
1526 );
1527 }
1528
1529 pub async fn test_query_null_sort_key_excluded<S: StorageAdaptor>(storage: &mut S) {
1531 clear_partitions(storage, &[0]).await.unwrap();
1532 let with_key_1 = Record {
1534 pk: "query_null_with_1".into(),
1535 partition: 0,
1536 sort_key: Some(SortKey::u32_asc(1)),
1537 data: b"with_key_1".to_vec(),
1538 };
1539 let with_key_2 = Record {
1540 pk: "query_null_with_2".into(),
1541 partition: 0,
1542 sort_key: Some(SortKey::u32_asc(2)),
1543 data: b"with_key_2".to_vec(),
1544 };
1545
1546 let without_key = Record {
1548 pk: "query_null_without".into(),
1549 partition: 0,
1550 sort_key: None,
1551 data: b"no_key".to_vec(),
1552 };
1553
1554 storage.put(with_key_1).await.expect("put should succeed");
1555 storage.put(without_key).await.expect("put should succeed");
1556 storage.put(with_key_2).await.expect("put should succeed");
1557
1558 let results_query = storage.query_sorted(Query::new_full_range(0)).await
1560 .expect("query should succeed");
1561 assert_eq!(results_query.len(), 2, "query should only return records with sort keys");
1562 assert_eq!(results_query[0].data, b"with_key_1");
1563 assert_eq!(results_query[1].data, b"with_key_2");
1564
1565 let results_all = storage.get_all(0).await
1567 .expect("get_all should succeed");
1568 assert_eq!(results_all.len(), 3, "get_all should return all records including those without sort keys");
1569
1570 let has_with_key_1 = results_all.iter().any(|r| r.data == b"with_key_1");
1572 let has_with_key_2 = results_all.iter().any(|r| r.data == b"with_key_2");
1573 let has_without_key = results_all.iter().any(|r| r.data == b"no_key");
1574 assert!(has_with_key_1, "get_all should include with_key_1");
1575 assert!(has_with_key_2, "get_all should include with_key_2");
1576 assert!(has_without_key, "get_all should include record without sort key");
1577 }
1578
1579 pub async fn test_query_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1581 clear_partitions(storage, &[0, 1]).await.unwrap();
1582 for i in 0..3 {
1584 let record = Record {
1585 pk: format!("query_iso_a_{}", i).into(),
1586 partition: 0,
1587 sort_key: Some(SortKey::u32_asc(i)),
1588 data: format!("record_{}", i).as_bytes().to_vec(),
1589 };
1590 storage.put(record).await.expect("put should succeed");
1591 }
1592
1593 for i in 0..5 {
1595 let record = Record {
1596 pk: format!("query_iso_b_{}", i).into(),
1597 partition: 1,
1598 sort_key: Some(SortKey::u32_asc(i)),
1599 data: format!("record_{}", i + 100).as_bytes().to_vec(),
1600 };
1601 storage.put(record).await.expect("put should succeed");
1602 }
1603
1604 let results_a = storage
1605 .query_sorted(Query::new_full_range(0))
1606 .await
1607 .expect("query should succeed");
1608
1609 let results_b = storage
1610 .query_sorted(Query::new_full_range(1))
1611 .await
1612 .expect("query should succeed");
1613
1614 assert_eq!(results_a.len(), 3, "partition A should have 3 records");
1615 assert_eq!(results_b.len(), 5, "partition B should have 5 records");
1616
1617 assert!(results_a
1619 .iter()
1620 .all(|r| r.partition == 0));
1621
1622 assert!(results_b
1624 .iter()
1625 .all(|r| r.partition == 1));
1626 }
1627
1628 pub async fn test_query_range<S: StorageAdaptor>(storage: &mut S) {
1630 clear_partitions(storage, &[0]).await.unwrap();
1631
1632 for i in 1..=10u32 {
1634 let record = Record {
1635 pk: format!("query_range_{}", i).into(),
1636 partition: 0,
1637 sort_key: Some(SortKey::u32_asc(i)),
1638 data: format!("record_{}", i).as_bytes().to_vec(),
1639 };
1640 storage.put(record).await.expect("put should succeed");
1641 }
1642
1643 let results_start = storage
1645 .query_sorted(Query::new(0, SortKey::u32_asc(5)..))
1646 .await
1647 .expect("query should succeed");
1648
1649 assert_eq!(results_start.len(), 6, "should return records 5-10");
1650 let values: Vec<_> = results_start.iter().map(|r| r.data.clone()).collect();
1651 assert_eq!(
1652 values,
1653 vec![
1654 b"record_5".to_vec(),
1655 b"record_6".to_vec(),
1656 b"record_7".to_vec(),
1657 b"record_8".to_vec(),
1658 b"record_9".to_vec(),
1659 b"record_10".to_vec(),
1660 ],
1661 "should return records from 5 onwards"
1662 );
1663
1664 let results_end = storage
1666 .query_sorted(Query::new(0, ..=SortKey::u32_asc(3)))
1667 .await
1668 .expect("query should succeed");
1669
1670 assert_eq!(results_end.len(), 3, "should return records 1-3");
1671 let values: Vec<_> = results_end.iter().map(|r| r.data.clone()).collect();
1672 assert_eq!(
1673 values,
1674 vec![
1675 b"record_1".to_vec(),
1676 b"record_2".to_vec(),
1677 b"record_3".to_vec(),
1678 ],
1679 "should return records up to 3"
1680 );
1681
1682 let results_range = storage
1684 .query_sorted(Query::new(0, SortKey::u32_asc(3)..=SortKey::u32_asc(7)))
1685 .await
1686 .expect("query should succeed");
1687
1688 assert_eq!(results_range.len(), 5, "should return records 3-7");
1689 let values: Vec<_> = results_range.iter().map(|r| r.data.clone()).collect();
1690 assert_eq!(
1691 values,
1692 vec![
1693 b"record_3".to_vec(),
1694 b"record_4".to_vec(),
1695 b"record_5".to_vec(),
1696 b"record_6".to_vec(),
1697 b"record_7".to_vec(),
1698 ],
1699 "should return records in range 3-7"
1700 );
1701
1702 let results_range_limit = storage
1704 .query_sorted(Query::new(0, SortKey::u32_asc(2)..=SortKey::u32_asc(8)).limit(3))
1705 .await
1706 .expect("query should succeed");
1707
1708 assert_eq!(results_range_limit.len(), 3, "should return only 3 records due to limit");
1709 let values: Vec<_> = results_range_limit.iter().map(|r| r.data.clone()).collect();
1710 assert_eq!(
1711 values,
1712 vec![
1713 b"record_2".to_vec(),
1714 b"record_3".to_vec(),
1715 b"record_4".to_vec(),
1716 ],
1717 "should return first 3 records in range"
1718 );
1719
1720 let results_empty = storage
1722 .query_sorted(Query::new(0, SortKey::u32_asc(100)..=SortKey::u32_asc(200)))
1723 .await
1724 .expect("query should succeed");
1725
1726 assert!(results_empty.is_empty(), "should return no records for out-of-range query");
1727 }
1728
1729 pub async fn test_query_exclusive_end_range<S: StorageAdaptor>(storage: &mut S) {
1731 clear_partitions(storage, &[0]).await.unwrap();
1732
1733 for i in 1..=10u32 {
1734 let record = Record {
1735 pk: format!("query_excl_{}", i).into(),
1736 partition: 0,
1737 sort_key: Some(SortKey::u32_asc(i)),
1738 data: format!("record_{}", i).as_bytes().to_vec(),
1739 };
1740 storage.put(record).await.expect("put should succeed");
1741 }
1742
1743 let results = storage
1745 .query_sorted(Query::new(0, SortKey::u32_asc(3)..SortKey::u32_asc(7)))
1746 .await
1747 .expect("query should succeed");
1748
1749 assert_eq!(results.len(), 4, "exclusive end should not include record_7");
1750 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1751 assert_eq!(
1752 values,
1753 vec![
1754 b"record_3".to_vec(),
1755 b"record_4".to_vec(),
1756 b"record_5".to_vec(),
1757 b"record_6".to_vec(),
1758 ],
1759 );
1760
1761 let results = storage
1763 .query_sorted(Query::new(0, ..SortKey::u32_asc(4)))
1764 .await
1765 .expect("query should succeed");
1766
1767 assert_eq!(results.len(), 3, "exclusive upper bound should not include record_4");
1768 let values: Vec<_> = results.iter().map(|r| r.data.clone()).collect();
1769 assert_eq!(
1770 values,
1771 vec![
1772 b"record_1".to_vec(),
1773 b"record_2".to_vec(),
1774 b"record_3".to_vec(),
1775 ],
1776 );
1777 }
1778
1779 pub async fn test_query_full_range_limit_one<S: StorageAdaptor>(storage: &mut S) {
1781 clear_partitions(storage, &[0]).await.unwrap();
1782
1783 for i in [5u32, 2, 8, 1, 9] {
1784 let record = Record {
1785 pk: format!("query_limit1_{}", i).into(),
1786 partition: 0,
1787 sort_key: Some(SortKey::u32_asc(i)),
1788 data: format!("record_{}", i).as_bytes().to_vec(),
1789 };
1790 storage.put(record).await.expect("put should succeed");
1791 }
1792
1793 let results = storage
1794 .query_sorted(Query::new_full_range(0).limit(1))
1795 .await
1796 .expect("query should succeed");
1797
1798 assert_eq!(results.len(), 1);
1799 assert_eq!(results[0].data, b"record_1".to_vec(), "limit 1 should return the first record in sort order");
1800 }
1801
1802 pub async fn test_incremental_id_starts_at_one<S: StorageAdaptor>(storage: &mut S) {
1804 storage.delete(LAST_IDS, b"0").await.unwrap();
1806 let id = storage.incremental_id(0).await
1807 .expect("incremental_id should succeed");
1808
1809 assert_eq!(id, 1, "first id should be 1");
1810 }
1811
1812 pub async fn test_incremental_id_increments<S: StorageAdaptor>(storage: &mut S) {
1814 clear_partitions(storage, &[0, LAST_IDS]).await.unwrap();
1815
1816 let id1 = storage.incremental_id(0).await
1817 .expect("incremental_id should succeed");
1818 let id2 = storage.incremental_id(0).await
1819 .expect("incremental_id should succeed");
1820 let id3 = storage.incremental_id(0).await
1821 .expect("incremental_id should succeed");
1822
1823 assert_eq!(id1, 1, "first id should be 1");
1824 assert_eq!(id2, 2, "second id should be 2");
1825 assert_eq!(id3, 3, "third id should be 3");
1826 }
1827
1828 pub async fn test_incremental_id_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1830 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1831
1832 let a1 = storage.incremental_id(0).await
1834 .expect("incremental_id should succeed");
1835 let a2 = storage.incremental_id(0).await
1836 .expect("incremental_id should succeed");
1837 let a3 = storage.incremental_id(0).await
1838 .expect("incremental_id should succeed");
1839
1840 let b1 = storage.incremental_id(1).await
1842 .expect("incremental_id should succeed");
1843 let b2 = storage.incremental_id(1).await
1844 .expect("incremental_id should succeed");
1845
1846 assert_eq!(a1, 1);
1848 assert_eq!(a2, 2);
1849 assert_eq!(a3, 3);
1850
1851 assert_eq!(b1, 1);
1853 assert_eq!(b2, 2);
1854
1855 let a4 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1857 assert_eq!(a4, 4);
1858 }
1859
1860 pub async fn test_incremental_id_persists_across_operations<S: StorageAdaptor>(storage: &mut S) {
1862 clear_partitions(storage, &[0, 1, LAST_IDS]).await.unwrap();
1863
1864 let id1 = storage.incremental_id(0).await
1866 .expect("incremental_id should succeed");
1867 let id2 = storage.incremental_id(0).await
1868 .expect("incremental_id should succeed");
1869 assert_eq!(id1, 1);
1870 assert_eq!(id2, 2);
1871
1872 let stored = storage
1874 .get(LAST_IDS, &[0])
1875 .await
1876 .expect("get should succeed")
1877 .expect("id record should exist");
1878 let stored_id: u32 = serde_json::from_slice(&stored.data).expect("should deserialize");
1879 assert_eq!(stored_id, 2, "stored id should be 2");
1880
1881 let id3 = storage.incremental_id(0).await.expect("incremental_id should succeed");
1883 assert_eq!(id3, 3);
1884 }
1885
1886 pub async fn test_get_all_empty_partition<S: StorageAdaptor>(storage: &mut S) {
1888 clear_partitions(storage, &[0]).await.unwrap();
1889 let results = storage
1890 .get_all(0)
1891 .await
1892 .expect("get_all should succeed");
1893
1894 assert!(results.is_empty(), "get_all should return empty for empty partition");
1895 }
1896
1897 pub async fn test_get_all_returns_all_records<S: StorageAdaptor>(storage: &mut S) {
1899 clear_partitions(storage, &[0]).await.unwrap();
1900
1901 for i in 0..5 {
1903 let record = Record {
1904 pk: format!("get_all_{}", i).into(),
1905 partition: 0,
1906 sort_key: Some(SortKey::u32_asc(i)),
1907 data: format!("record_{}", i).as_bytes().to_vec(),
1908 };
1909 storage.put(record).await.expect("put should succeed");
1910 }
1911
1912 let results = storage
1913 .get_all(0)
1914 .await
1915 .expect("get_all should succeed");
1916
1917 assert_eq!(results.len(), 5, "get_all should return all 5 records");
1918
1919 for i in 0..5 {
1921 let expected_data = format!("record_{}", i).as_bytes().to_vec();
1922 let found = results.iter().any(|r| r.data == expected_data);
1923 assert!(found, "get_all should include record_{}", i);
1924 }
1925 }
1926
1927 pub async fn test_get_all_includes_records_without_sort_key<S: StorageAdaptor>(storage: &mut S) {
1929 clear_partitions(storage, &[0]).await.unwrap();
1930
1931 let with_key_1 = Record {
1933 pk: "get_all_with_1".into(),
1934 partition: 0,
1935 sort_key: Some(SortKey::u32_asc(1)),
1936 data: b"with_key_1".to_vec(),
1937 };
1938 let with_key_2 = Record {
1939 pk: "get_all_with_2".into(),
1940 partition: 0,
1941 sort_key: Some(SortKey::u32_asc(2)),
1942 data: b"with_key_2".to_vec(),
1943 };
1944
1945 let without_key_1 = Record {
1947 pk: "get_all_without_1".into(),
1948 partition: 0,
1949 sort_key: None,
1950 data: b"without_key_1".to_vec(),
1951 };
1952 let without_key_2 = Record {
1953 pk: "get_all_without_2".into(),
1954 partition: 0,
1955 sort_key: None,
1956 data: b"without_key_2".to_vec(),
1957 };
1958
1959 storage.put(with_key_1).await.expect("put should succeed");
1960 storage.put(without_key_1).await.expect("put should succeed");
1961 storage.put(with_key_2).await.expect("put should succeed");
1962 storage.put(without_key_2).await.expect("put should succeed");
1963
1964 let results = storage
1965 .get_all(0)
1966 .await
1967 .expect("get_all should succeed");
1968
1969 assert_eq!(results.len(), 4, "get_all should return all 4 records");
1970
1971 let has_with_1 = results.iter().any(|r| r.data == b"with_key_1");
1973 let has_with_2 = results.iter().any(|r| r.data == b"with_key_2");
1974 let has_without_1 = results.iter().any(|r| r.data == b"without_key_1");
1975 let has_without_2 = results.iter().any(|r| r.data == b"without_key_2");
1976
1977 assert!(has_with_1, "get_all should include with_key_1");
1978 assert!(has_with_2, "get_all should include with_key_2");
1979 assert!(has_without_1, "get_all should include without_key_1");
1980 assert!(has_without_2, "get_all should include without_key_2");
1981
1982 let query_results = storage
1984 .query_sorted(Query::new_full_range(0))
1985 .await
1986 .expect("query should succeed");
1987
1988 assert_eq!(query_results.len(), 2, "query should only return records with sort keys");
1989 let query_has_without = query_results.iter().any(|r| r.sort_key.is_none());
1990 assert!(!query_has_without, "query should not include records without sort keys");
1991 }
1992
1993 pub async fn test_get_all_partition_isolation<S: StorageAdaptor>(storage: &mut S) {
1995 clear_partitions(storage, &[0, 1]).await.unwrap();
1996
1997 for i in 0..3 {
1999 let record = Record {
2000 pk: format!("partition_0_{}", i).into(),
2001 partition: 0,
2002 sort_key: Some(SortKey::u32_asc(i)),
2003 data: format!("p0_record_{}", i).as_bytes().to_vec(),
2004 };
2005 storage.put(record).await.expect("put should succeed");
2006 }
2007
2008 for i in 0..2 {
2010 let record = Record {
2011 pk: format!("partition_1_{}", i).into(),
2012 partition: 1,
2013 sort_key: Some(SortKey::u32_asc(i)),
2014 data: format!("p1_record_{}", i).as_bytes().to_vec(),
2015 };
2016 storage.put(record).await.expect("put should succeed");
2017 }
2018
2019 let results_0 = storage.get_all(0).await.expect("get_all should succeed");
2021 assert_eq!(results_0.len(), 3, "partition 0 should have 3 records");
2022 assert!(results_0.iter().all(|r| r.partition == 0), "all records should be from partition 0");
2023
2024 let results_1 = storage.get_all(1).await.expect("get_all should succeed");
2026 assert_eq!(results_1.len(), 2, "partition 1 should have 2 records");
2027 assert!(results_1.iter().all(|r| r.partition == 1), "all records should be from partition 1");
2028 }
2029
2030 pub async fn test_get_all_after_delete<S: StorageAdaptor>(storage: &mut S) {
2032 clear_partitions(storage, &[0]).await.unwrap();
2033
2034 for i in 0..3u32 {
2035 let record = Record {
2036 pk: format!("get_all_del_{}", i).into(),
2037 partition: 0,
2038 sort_key: Some(SortKey::u32_asc(i)),
2039 data: format!("record_{}", i).as_bytes().to_vec(),
2040 };
2041 storage.put(record).await.expect("put should succeed");
2042 }
2043
2044 storage.delete(0, b"get_all_del_1").await.expect("delete should succeed");
2045
2046 let results = storage.get_all(0).await.expect("get_all should succeed");
2047 assert_eq!(results.len(), 2, "get_all should reflect the deletion");
2048
2049 let has_deleted = results.iter().any(|r| r.data == b"record_1".to_vec());
2050 assert!(!has_deleted, "deleted record should not appear");
2051 }
2052}