1use crate::{
2 declaration::AllocationDeclaration,
3 key::{StableKey, StableKeyError},
4 physical::{CommitRecoveryError, DualCommitStore},
5 schema::SchemaMetadata,
6 session::ValidatedAllocations,
7 slot::AllocationSlotDescriptor,
8};
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeSet;
11
12pub const CURRENT_LEDGER_SCHEMA_VERSION: u32 = 1;
14
15pub const CURRENT_PHYSICAL_FORMAT_ID: u32 = 1;
17
18#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
23pub struct AllocationLedger {
24 pub ledger_schema_version: u32,
26 pub physical_format_id: u32,
28 pub current_generation: u64,
30 pub allocation_history: AllocationHistory,
32}
33
34#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
39pub struct AllocationHistory {
40 pub records: Vec<AllocationRecord>,
42 pub generations: Vec<GenerationRecord>,
44}
45
46#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
51pub struct AllocationRecord {
52 pub stable_key: StableKey,
54 pub slot: AllocationSlotDescriptor,
56 pub state: AllocationState,
58 pub first_generation: u64,
60 pub last_seen_generation: u64,
62 pub retired_generation: Option<u64>,
64 pub schema_history: Vec<SchemaMetadataRecord>,
66}
67
68#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
73pub struct AllocationRetirement {
74 pub stable_key: StableKey,
76 pub slot: AllocationSlotDescriptor,
78}
79
80impl AllocationRetirement {
81 pub fn new(
83 stable_key: impl AsRef<str>,
84 slot: AllocationSlotDescriptor,
85 ) -> Result<Self, AllocationRetirementError> {
86 Ok(Self {
87 stable_key: StableKey::parse(stable_key).map_err(AllocationRetirementError::Key)?,
88 slot,
89 })
90 }
91}
92
93#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
98pub enum AllocationState {
99 Reserved,
101 Active,
103 Retired,
105}
106
107#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
112pub struct SchemaMetadataRecord {
113 pub generation: u64,
115 pub schema: SchemaMetadata,
117}
118
119#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
124pub struct GenerationRecord {
125 pub generation: u64,
127 pub parent_generation: Option<u64>,
129 pub runtime_fingerprint: Option<String>,
131 pub declaration_count: u32,
133 pub committed_at: Option<u64>,
135}
136
137#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
142pub struct LedgerCompatibility {
143 pub min_ledger_schema_version: u32,
145 pub max_ledger_schema_version: u32,
147 pub physical_format_id: u32,
149}
150
151impl LedgerCompatibility {
152 #[must_use]
154 pub const fn current() -> Self {
155 Self {
156 min_ledger_schema_version: CURRENT_LEDGER_SCHEMA_VERSION,
157 max_ledger_schema_version: CURRENT_LEDGER_SCHEMA_VERSION,
158 physical_format_id: CURRENT_PHYSICAL_FORMAT_ID,
159 }
160 }
161
162 pub const fn validate(
164 &self,
165 ledger: &AllocationLedger,
166 ) -> Result<(), LedgerCompatibilityError> {
167 if ledger.ledger_schema_version < self.min_ledger_schema_version {
168 return Err(LedgerCompatibilityError::UnsupportedLedgerSchemaVersion {
169 found: ledger.ledger_schema_version,
170 min_supported: self.min_ledger_schema_version,
171 max_supported: self.max_ledger_schema_version,
172 });
173 }
174 if ledger.ledger_schema_version > self.max_ledger_schema_version {
175 return Err(LedgerCompatibilityError::UnsupportedLedgerSchemaVersion {
176 found: ledger.ledger_schema_version,
177 min_supported: self.min_ledger_schema_version,
178 max_supported: self.max_ledger_schema_version,
179 });
180 }
181 if ledger.physical_format_id != self.physical_format_id {
182 return Err(LedgerCompatibilityError::UnsupportedPhysicalFormat {
183 found: ledger.physical_format_id,
184 supported: self.physical_format_id,
185 });
186 }
187 Ok(())
188 }
189}
190
191impl Default for LedgerCompatibility {
192 fn default() -> Self {
193 Self::current()
194 }
195}
196
197#[derive(Clone, Copy, Debug, Eq, thiserror::Error, PartialEq)]
202pub enum LedgerCompatibilityError {
203 #[error(
205 "ledger_schema_version {found} is unsupported; supported range is {min_supported}-{max_supported}"
206 )]
207 UnsupportedLedgerSchemaVersion {
208 found: u32,
210 min_supported: u32,
212 max_supported: u32,
214 },
215 #[error("physical_format_id {found} is unsupported; supported format is {supported}")]
217 UnsupportedPhysicalFormat {
218 found: u32,
220 supported: u32,
222 },
223}
224
225#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
230pub enum LedgerIntegrityError {
231 #[error("stable key '{stable_key}' appears in more than one allocation record")]
233 DuplicateStableKey {
234 stable_key: StableKey,
236 },
237 #[error("allocation slot '{slot:?}' appears in more than one allocation record")]
239 DuplicateSlot {
240 slot: Box<AllocationSlotDescriptor>,
242 },
243 #[error("stable key '{stable_key}' has first_generation after last_seen_generation")]
245 InvalidRecordGenerationOrder {
246 stable_key: StableKey,
248 first_generation: u64,
250 last_seen_generation: u64,
252 },
253 #[error(
255 "stable key '{stable_key}' references generation {generation} after current generation {current_generation}"
256 )]
257 FutureRecordGeneration {
258 stable_key: StableKey,
260 generation: u64,
262 current_generation: u64,
264 },
265 #[error("stable key '{stable_key}' is not retired but has retired_generation metadata")]
267 UnexpectedRetiredGeneration {
268 stable_key: StableKey,
270 },
271 #[error("stable key '{stable_key}' is retired but retired_generation is missing")]
273 MissingRetiredGeneration {
274 stable_key: StableKey,
276 },
277 #[error("stable key '{stable_key}' has retired_generation before first_generation")]
279 RetiredBeforeFirstGeneration {
280 stable_key: StableKey,
282 first_generation: u64,
284 retired_generation: u64,
286 },
287 #[error("stable key '{stable_key}' has empty schema metadata history")]
289 EmptySchemaHistory {
290 stable_key: StableKey,
292 },
293 #[error("stable key '{stable_key}' has non-increasing schema metadata generation history")]
295 NonIncreasingSchemaHistory {
296 stable_key: StableKey,
298 },
299 #[error("stable key '{stable_key}' has schema metadata generation outside the ledger bounds")]
301 SchemaHistoryOutOfBounds {
302 stable_key: StableKey,
304 generation: u64,
306 },
307 #[error("generation {generation} appears more than once")]
309 DuplicateGeneration {
310 generation: u64,
312 },
313 #[error("generation {generation} is after current generation {current_generation}")]
315 FutureGeneration {
316 generation: u64,
318 current_generation: u64,
320 },
321 #[error("generation {generation} has invalid parent generation {parent_generation:?}")]
323 InvalidParentGeneration {
324 generation: u64,
326 parent_generation: Option<u64>,
328 },
329}
330
331pub trait LedgerCodec {
336 type Error;
338
339 fn encode(&self, ledger: &AllocationLedger) -> Result<Vec<u8>, Self::Error>;
341
342 fn decode(&self, bytes: &[u8]) -> Result<AllocationLedger, Self::Error>;
344}
345
346#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
355pub struct LedgerCommitStore {
356 pub physical: DualCommitStore,
358}
359
360impl LedgerCommitStore {
361 pub fn recover<C: LedgerCodec>(
363 &self,
364 codec: &C,
365 ) -> Result<AllocationLedger, LedgerCommitError<C::Error>> {
366 self.recover_with_compatibility(codec, LedgerCompatibility::current())
367 }
368
369 pub fn recover_with_compatibility<C: LedgerCodec>(
371 &self,
372 codec: &C,
373 compatibility: LedgerCompatibility,
374 ) -> Result<AllocationLedger, LedgerCommitError<C::Error>> {
375 let committed = self
376 .physical
377 .authoritative()
378 .map_err(LedgerCommitError::Recovery)?;
379 let ledger = codec
380 .decode(&committed.payload)
381 .map_err(LedgerCommitError::Codec)?;
382 compatibility
383 .validate(&ledger)
384 .map_err(LedgerCommitError::Compatibility)?;
385 ledger
386 .validate_integrity()
387 .map_err(LedgerCommitError::Integrity)?;
388 Ok(ledger)
389 }
390
391 pub fn recover_or_initialize<C: LedgerCodec>(
397 &mut self,
398 codec: &C,
399 genesis: &AllocationLedger,
400 ) -> Result<AllocationLedger, LedgerCommitError<C::Error>> {
401 self.recover_or_initialize_with_compatibility(
402 codec,
403 genesis,
404 LedgerCompatibility::current(),
405 )
406 }
407
408 pub fn recover_or_initialize_with_compatibility<C: LedgerCodec>(
410 &mut self,
411 codec: &C,
412 genesis: &AllocationLedger,
413 compatibility: LedgerCompatibility,
414 ) -> Result<AllocationLedger, LedgerCommitError<C::Error>> {
415 match self.recover_with_compatibility(codec, compatibility) {
416 Ok(ledger) => Ok(ledger),
417 Err(LedgerCommitError::Recovery(CommitRecoveryError::NoValidGeneration))
418 if self.physical.is_uninitialized() =>
419 {
420 self.commit_with_compatibility(genesis, codec, compatibility)
421 }
422 Err(err) => Err(err),
423 }
424 }
425
426 pub fn commit<C: LedgerCodec>(
428 &mut self,
429 ledger: &AllocationLedger,
430 codec: &C,
431 ) -> Result<AllocationLedger, LedgerCommitError<C::Error>> {
432 self.commit_with_compatibility(ledger, codec, LedgerCompatibility::current())
433 }
434
435 pub fn commit_with_compatibility<C: LedgerCodec>(
437 &mut self,
438 ledger: &AllocationLedger,
439 codec: &C,
440 compatibility: LedgerCompatibility,
441 ) -> Result<AllocationLedger, LedgerCommitError<C::Error>> {
442 compatibility
443 .validate(ledger)
444 .map_err(LedgerCommitError::Compatibility)?;
445 ledger
446 .validate_integrity()
447 .map_err(LedgerCommitError::Integrity)?;
448 let payload = codec.encode(ledger).map_err(LedgerCommitError::Codec)?;
449 self.physical
450 .commit_payload(payload)
451 .map_err(LedgerCommitError::Recovery)?;
452 self.recover_with_compatibility(codec, compatibility)
453 }
454
455 pub fn write_corrupt_inactive_ledger<C: LedgerCodec>(
457 &mut self,
458 ledger: &AllocationLedger,
459 codec: &C,
460 ) -> Result<(), LedgerCommitError<C::Error>> {
461 let payload = codec.encode(ledger).map_err(LedgerCommitError::Codec)?;
462 self.physical
463 .write_corrupt_inactive_slot(ledger.current_generation, payload);
464 Ok(())
465 }
466}
467
468#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
473pub enum LedgerCommitError<E> {
474 #[error(transparent)]
476 Recovery(CommitRecoveryError),
477 #[error("allocation ledger codec failed")]
479 Codec(E),
480 #[error(transparent)]
482 Compatibility(LedgerCompatibilityError),
483 #[error(transparent)]
485 Integrity(LedgerIntegrityError),
486}
487
488#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
493pub enum AllocationReservationError {
494 #[error("stable key '{stable_key}' was historically bound to a different allocation slot")]
496 StableKeySlotConflict {
497 stable_key: StableKey,
499 historical_slot: Box<AllocationSlotDescriptor>,
501 reserved_slot: Box<AllocationSlotDescriptor>,
503 },
504 #[error("allocation slot '{slot:?}' was historically bound to stable key '{historical_key}'")]
506 SlotStableKeyConflict {
507 slot: Box<AllocationSlotDescriptor>,
509 historical_key: StableKey,
511 reserved_key: StableKey,
513 },
514 #[error("stable key '{stable_key}' is already active and cannot be reserved")]
516 ActiveAllocation {
517 stable_key: StableKey,
519 slot: Box<AllocationSlotDescriptor>,
521 },
522 #[error("stable key '{stable_key}' was explicitly retired and cannot be reserved")]
524 RetiredAllocation {
525 stable_key: StableKey,
527 slot: Box<AllocationSlotDescriptor>,
529 },
530}
531
532#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
537pub enum AllocationRetirementError {
538 #[error(transparent)]
540 Key(StableKeyError),
541 #[error("stable key '{0}' has no allocation record to retire")]
543 UnknownStableKey(StableKey),
544 #[error("stable key '{stable_key}' cannot be retired for a different allocation slot")]
546 SlotMismatch {
547 stable_key: StableKey,
549 historical_slot: Box<AllocationSlotDescriptor>,
551 retired_slot: Box<AllocationSlotDescriptor>,
553 },
554 #[error("stable key '{stable_key}' was already retired")]
556 AlreadyRetired {
557 stable_key: StableKey,
559 slot: Box<AllocationSlotDescriptor>,
561 },
562}
563
564impl AllocationRecord {
565 #[must_use]
567 pub fn from_declaration(
568 generation: u64,
569 declaration: AllocationDeclaration,
570 state: AllocationState,
571 ) -> Self {
572 Self {
573 stable_key: declaration.stable_key,
574 slot: declaration.slot,
575 state,
576 first_generation: generation,
577 last_seen_generation: generation,
578 retired_generation: None,
579 schema_history: vec![SchemaMetadataRecord {
580 generation,
581 schema: declaration.schema,
582 }],
583 }
584 }
585
586 #[must_use]
588 pub fn reserved(generation: u64, declaration: AllocationDeclaration) -> Self {
589 Self::from_declaration(generation, declaration, AllocationState::Reserved)
590 }
591
592 fn observe_declaration(&mut self, generation: u64, declaration: &AllocationDeclaration) {
593 self.last_seen_generation = generation;
594 if self.state == AllocationState::Reserved {
595 self.state = AllocationState::Active;
596 }
597
598 let latest_schema = self.schema_history.last().map(|record| &record.schema);
599 if latest_schema != Some(&declaration.schema) {
600 self.schema_history.push(SchemaMetadataRecord {
601 generation,
602 schema: declaration.schema.clone(),
603 });
604 }
605 }
606
607 fn observe_reservation(&mut self, generation: u64, reservation: &AllocationDeclaration) {
608 self.last_seen_generation = generation;
609
610 let latest_schema = self.schema_history.last().map(|record| &record.schema);
611 if latest_schema != Some(&reservation.schema) {
612 self.schema_history.push(SchemaMetadataRecord {
613 generation,
614 schema: reservation.schema.clone(),
615 });
616 }
617 }
618}
619
620impl AllocationLedger {
621 pub fn validate_integrity(&self) -> Result<(), LedgerIntegrityError> {
623 let mut stable_keys = BTreeSet::new();
624 let mut slots = BTreeSet::new();
625
626 for record in &self.allocation_history.records {
627 if !stable_keys.insert(record.stable_key.clone()) {
628 return Err(LedgerIntegrityError::DuplicateStableKey {
629 stable_key: record.stable_key.clone(),
630 });
631 }
632 if !slots.insert(record.slot.clone()) {
633 return Err(LedgerIntegrityError::DuplicateSlot {
634 slot: Box::new(record.slot.clone()),
635 });
636 }
637 validate_record_integrity(self.current_generation, record)?;
638 }
639
640 let mut generations = BTreeSet::new();
641 for generation in &self.allocation_history.generations {
642 if !generations.insert(generation.generation) {
643 return Err(LedgerIntegrityError::DuplicateGeneration {
644 generation: generation.generation,
645 });
646 }
647 if generation.generation > self.current_generation {
648 return Err(LedgerIntegrityError::FutureGeneration {
649 generation: generation.generation,
650 current_generation: self.current_generation,
651 });
652 }
653 if generation
654 .parent_generation
655 .is_some_and(|parent| parent >= generation.generation)
656 {
657 return Err(LedgerIntegrityError::InvalidParentGeneration {
658 generation: generation.generation,
659 parent_generation: generation.parent_generation,
660 });
661 }
662 }
663
664 Ok(())
665 }
666
667 #[must_use]
672 pub fn stage_validated_generation(
673 &self,
674 validated: &ValidatedAllocations,
675 committed_at: Option<u64>,
676 ) -> Self {
677 let next_generation = self.current_generation.saturating_add(1);
678 let mut next = self.clone();
679 next.current_generation = next_generation;
680 let declaration_count = u32::try_from(validated.declarations().len()).unwrap_or(u32::MAX);
681
682 for declaration in validated.declarations() {
683 record_declaration(&mut next, next_generation, declaration);
684 }
685
686 next.allocation_history.generations.push(GenerationRecord {
687 generation: next_generation,
688 parent_generation: Some(self.current_generation),
689 runtime_fingerprint: validated.runtime_fingerprint().map(str::to_string),
690 declaration_count,
691 committed_at,
692 });
693
694 next
695 }
696
697 pub fn stage_reservation_generation(
702 &self,
703 reservations: &[AllocationDeclaration],
704 committed_at: Option<u64>,
705 ) -> Result<Self, AllocationReservationError> {
706 let next_generation = self.current_generation.saturating_add(1);
707 let mut next = self.clone();
708 next.current_generation = next_generation;
709
710 for reservation in reservations {
711 record_reservation(&mut next, next_generation, reservation)?;
712 }
713
714 next.allocation_history.generations.push(GenerationRecord {
715 generation: next_generation,
716 parent_generation: Some(self.current_generation),
717 runtime_fingerprint: None,
718 declaration_count: u32::try_from(reservations.len()).unwrap_or(u32::MAX),
719 committed_at,
720 });
721
722 Ok(next)
723 }
724
725 pub fn stage_retirement_generation(
727 &self,
728 retirement: &AllocationRetirement,
729 committed_at: Option<u64>,
730 ) -> Result<Self, AllocationRetirementError> {
731 let next_generation = self.current_generation.saturating_add(1);
732 let mut next = self.clone();
733 let record = next
734 .allocation_history
735 .records
736 .iter_mut()
737 .find(|record| record.stable_key == retirement.stable_key)
738 .ok_or_else(|| {
739 AllocationRetirementError::UnknownStableKey(retirement.stable_key.clone())
740 })?;
741
742 if record.slot != retirement.slot {
743 return Err(AllocationRetirementError::SlotMismatch {
744 stable_key: retirement.stable_key.clone(),
745 historical_slot: Box::new(record.slot.clone()),
746 retired_slot: Box::new(retirement.slot.clone()),
747 });
748 }
749 if record.state == AllocationState::Retired {
750 return Err(AllocationRetirementError::AlreadyRetired {
751 stable_key: retirement.stable_key.clone(),
752 slot: Box::new(record.slot.clone()),
753 });
754 }
755
756 record.state = AllocationState::Retired;
757 record.retired_generation = Some(next_generation);
758 next.current_generation = next_generation;
759 next.allocation_history.generations.push(GenerationRecord {
760 generation: next_generation,
761 parent_generation: Some(self.current_generation),
762 runtime_fingerprint: None,
763 declaration_count: 0,
764 committed_at,
765 });
766
767 Ok(next)
768 }
769}
770
771fn record_declaration(
772 ledger: &mut AllocationLedger,
773 generation: u64,
774 declaration: &AllocationDeclaration,
775) {
776 if let Some(record) = ledger
777 .allocation_history
778 .records
779 .iter_mut()
780 .find(|record| record.stable_key == declaration.stable_key)
781 {
782 record.observe_declaration(generation, declaration);
783 return;
784 }
785
786 ledger
787 .allocation_history
788 .records
789 .push(AllocationRecord::from_declaration(
790 generation,
791 declaration.clone(),
792 AllocationState::Active,
793 ));
794}
795
796fn record_reservation(
797 ledger: &mut AllocationLedger,
798 generation: u64,
799 reservation: &AllocationDeclaration,
800) -> Result<(), AllocationReservationError> {
801 if let Some(record) = ledger
802 .allocation_history
803 .records
804 .iter_mut()
805 .find(|record| record.stable_key == reservation.stable_key)
806 {
807 if record.slot != reservation.slot {
808 return Err(AllocationReservationError::StableKeySlotConflict {
809 stable_key: reservation.stable_key.clone(),
810 historical_slot: Box::new(record.slot.clone()),
811 reserved_slot: Box::new(reservation.slot.clone()),
812 });
813 }
814
815 return match record.state {
816 AllocationState::Reserved => {
817 record.observe_reservation(generation, reservation);
818 Ok(())
819 }
820 AllocationState::Active => Err(AllocationReservationError::ActiveAllocation {
821 stable_key: reservation.stable_key.clone(),
822 slot: Box::new(record.slot.clone()),
823 }),
824 AllocationState::Retired => Err(AllocationReservationError::RetiredAllocation {
825 stable_key: reservation.stable_key.clone(),
826 slot: Box::new(record.slot.clone()),
827 }),
828 };
829 }
830
831 if let Some(record) = ledger
832 .allocation_history
833 .records
834 .iter()
835 .find(|record| record.slot == reservation.slot)
836 {
837 return Err(AllocationReservationError::SlotStableKeyConflict {
838 slot: Box::new(reservation.slot.clone()),
839 historical_key: record.stable_key.clone(),
840 reserved_key: reservation.stable_key.clone(),
841 });
842 }
843
844 ledger
845 .allocation_history
846 .records
847 .push(AllocationRecord::reserved(generation, reservation.clone()));
848 Ok(())
849}
850
851fn validate_record_integrity(
852 current_generation: u64,
853 record: &AllocationRecord,
854) -> Result<(), LedgerIntegrityError> {
855 if record.first_generation > record.last_seen_generation {
856 return Err(LedgerIntegrityError::InvalidRecordGenerationOrder {
857 stable_key: record.stable_key.clone(),
858 first_generation: record.first_generation,
859 last_seen_generation: record.last_seen_generation,
860 });
861 }
862 if record.last_seen_generation > current_generation {
863 return Err(LedgerIntegrityError::FutureRecordGeneration {
864 stable_key: record.stable_key.clone(),
865 generation: record.last_seen_generation,
866 current_generation,
867 });
868 }
869
870 match (record.state, record.retired_generation) {
871 (AllocationState::Retired, Some(retired_generation)) => {
872 if retired_generation < record.first_generation {
873 return Err(LedgerIntegrityError::RetiredBeforeFirstGeneration {
874 stable_key: record.stable_key.clone(),
875 first_generation: record.first_generation,
876 retired_generation,
877 });
878 }
879 if retired_generation > current_generation {
880 return Err(LedgerIntegrityError::FutureRecordGeneration {
881 stable_key: record.stable_key.clone(),
882 generation: retired_generation,
883 current_generation,
884 });
885 }
886 }
887 (AllocationState::Retired, None) => {
888 return Err(LedgerIntegrityError::MissingRetiredGeneration {
889 stable_key: record.stable_key.clone(),
890 });
891 }
892 (AllocationState::Reserved | AllocationState::Active, Some(_)) => {
893 return Err(LedgerIntegrityError::UnexpectedRetiredGeneration {
894 stable_key: record.stable_key.clone(),
895 });
896 }
897 (AllocationState::Reserved | AllocationState::Active, None) => {}
898 }
899
900 validate_schema_history_integrity(current_generation, record)
901}
902
903fn validate_schema_history_integrity(
904 current_generation: u64,
905 record: &AllocationRecord,
906) -> Result<(), LedgerIntegrityError> {
907 if record.schema_history.is_empty() {
908 return Err(LedgerIntegrityError::EmptySchemaHistory {
909 stable_key: record.stable_key.clone(),
910 });
911 }
912
913 let mut previous = None;
914 for schema in &record.schema_history {
915 if previous.is_some_and(|generation| schema.generation <= generation) {
916 return Err(LedgerIntegrityError::NonIncreasingSchemaHistory {
917 stable_key: record.stable_key.clone(),
918 });
919 }
920 if schema.generation < record.first_generation || schema.generation > current_generation {
921 return Err(LedgerIntegrityError::SchemaHistoryOutOfBounds {
922 stable_key: record.stable_key.clone(),
923 generation: schema.generation,
924 });
925 }
926 previous = Some(schema.generation);
927 }
928
929 Ok(())
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use crate::{declaration::DeclarationSnapshot, schema::SchemaMetadata};
936
937 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
938 struct TestCodec;
939
940 impl LedgerCodec for TestCodec {
941 type Error = &'static str;
942
943 fn encode(&self, ledger: &AllocationLedger) -> Result<Vec<u8>, Self::Error> {
944 let mut bytes = Vec::with_capacity(16);
945 bytes.extend_from_slice(&ledger.ledger_schema_version.to_le_bytes());
946 bytes.extend_from_slice(&ledger.physical_format_id.to_le_bytes());
947 bytes.extend_from_slice(&ledger.current_generation.to_le_bytes());
948 Ok(bytes)
949 }
950
951 fn decode(&self, bytes: &[u8]) -> Result<AllocationLedger, Self::Error> {
952 let bytes = <[u8; 16]>::try_from(bytes).map_err(|_| "invalid ledger")?;
953 let ledger_schema_version =
954 u32::from_le_bytes(bytes[0..4].try_into().map_err(|_| "invalid schema")?);
955 let physical_format_id =
956 u32::from_le_bytes(bytes[4..8].try_into().map_err(|_| "invalid format")?);
957 let current_generation =
958 u64::from_le_bytes(bytes[8..16].try_into().map_err(|_| "invalid generation")?);
959 Ok(AllocationLedger {
960 ledger_schema_version,
961 physical_format_id,
962 current_generation,
963 ..ledger()
964 })
965 }
966 }
967
968 fn declaration(key: &str, id: u8, schema_version: Option<u32>) -> AllocationDeclaration {
969 AllocationDeclaration::new(
970 key,
971 AllocationSlotDescriptor::memory_manager(id),
972 None,
973 SchemaMetadata {
974 schema_version,
975 schema_fingerprint: None,
976 },
977 )
978 .expect("declaration")
979 }
980
981 fn ledger() -> AllocationLedger {
982 AllocationLedger {
983 ledger_schema_version: CURRENT_LEDGER_SCHEMA_VERSION,
984 physical_format_id: CURRENT_PHYSICAL_FORMAT_ID,
985 current_generation: 3,
986 allocation_history: AllocationHistory::default(),
987 }
988 }
989
990 fn active_record(key: &str, id: u8) -> AllocationRecord {
991 AllocationRecord::from_declaration(1, declaration(key, id, None), AllocationState::Active)
992 }
993
994 fn validated(
995 generation: u64,
996 declarations: Vec<AllocationDeclaration>,
997 ) -> crate::session::ValidatedAllocations {
998 crate::session::ValidatedAllocations::new(generation, declarations, None)
999 }
1000
1001 #[test]
1002 fn stage_validated_generation_records_new_allocations() {
1003 let declarations = vec![declaration("app.users.v1", 100, Some(1))];
1004 let validated = validated(3, declarations);
1005
1006 let staged = ledger().stage_validated_generation(&validated, Some(42));
1007
1008 assert_eq!(staged.current_generation, 4);
1009 assert_eq!(staged.allocation_history.records.len(), 1);
1010 assert_eq!(staged.allocation_history.records[0].first_generation, 4);
1011 assert_eq!(staged.allocation_history.generations[0].generation, 4);
1012 assert_eq!(
1013 staged.allocation_history.generations[0].committed_at,
1014 Some(42)
1015 );
1016 }
1017
1018 #[test]
1019 fn stage_validated_generation_preserves_omitted_records() {
1020 let first = validated(
1021 3,
1022 vec![
1023 declaration("app.users.v1", 100, Some(1)),
1024 declaration("app.orders.v1", 101, Some(1)),
1025 ],
1026 );
1027 let second = validated(4, vec![declaration("app.users.v1", 100, Some(1))]);
1028
1029 let staged = ledger().stage_validated_generation(&first, None);
1030 let staged = staged.stage_validated_generation(&second, None);
1031
1032 assert_eq!(staged.current_generation, 5);
1033 assert_eq!(staged.allocation_history.records.len(), 2);
1034 let omitted = staged
1035 .allocation_history
1036 .records
1037 .iter()
1038 .find(|record| record.stable_key.as_str() == "app.orders.v1")
1039 .expect("omitted record");
1040 assert_eq!(omitted.state, AllocationState::Active);
1041 assert_eq!(omitted.last_seen_generation, 4);
1042 }
1043
1044 #[test]
1045 fn stage_validated_generation_records_schema_metadata_history() {
1046 let first = validated(3, vec![declaration("app.users.v1", 100, Some(1))]);
1047 let second = validated(4, vec![declaration("app.users.v1", 100, Some(2))]);
1048
1049 let staged = ledger().stage_validated_generation(&first, None);
1050 let staged = staged.stage_validated_generation(&second, None);
1051 let record = &staged.allocation_history.records[0];
1052
1053 assert_eq!(record.schema_history.len(), 2);
1054 assert_eq!(record.schema_history[0].generation, 4);
1055 assert_eq!(record.schema_history[1].generation, 5);
1056 }
1057
1058 #[test]
1059 fn stage_reservation_generation_records_reserved_allocations() {
1060 let reservations = vec![declaration("ic_memory.generation_log.v1", 1, None)];
1061
1062 let staged = ledger()
1063 .stage_reservation_generation(&reservations, Some(42))
1064 .expect("reserved generation");
1065
1066 assert_eq!(staged.current_generation, 4);
1067 assert_eq!(staged.allocation_history.records.len(), 1);
1068 assert_eq!(
1069 staged.allocation_history.records[0].state,
1070 AllocationState::Reserved
1071 );
1072 assert_eq!(
1073 staged.allocation_history.generations[0].declaration_count,
1074 1
1075 );
1076 }
1077
1078 #[test]
1079 fn stage_reservation_generation_rejects_active_allocation() {
1080 let active = validated(3, vec![declaration("app.users.v1", 100, None)]);
1081 let staged = ledger().stage_validated_generation(&active, None);
1082 let reservations = vec![declaration("app.users.v1", 100, None)];
1083
1084 let err = staged
1085 .stage_reservation_generation(&reservations, None)
1086 .expect_err("active cannot become reserved");
1087
1088 assert!(matches!(
1089 err,
1090 AllocationReservationError::ActiveAllocation { .. }
1091 ));
1092 }
1093
1094 #[test]
1095 fn stage_validated_generation_activates_reserved_record() {
1096 let reservations = vec![declaration("app.future_store.v1", 100, Some(1))];
1097 let staged = ledger()
1098 .stage_reservation_generation(&reservations, None)
1099 .expect("reserved generation");
1100 let active = validated(4, vec![declaration("app.future_store.v1", 100, Some(2))]);
1101
1102 let staged = staged.stage_validated_generation(&active, None);
1103 let record = &staged.allocation_history.records[0];
1104
1105 assert_eq!(record.state, AllocationState::Active);
1106 assert_eq!(record.first_generation, 4);
1107 assert_eq!(record.last_seen_generation, 5);
1108 assert_eq!(record.schema_history.len(), 2);
1109 }
1110
1111 #[test]
1112 fn stage_retirement_generation_tombstones_named_allocation() {
1113 let active = validated(3, vec![declaration("app.users.v1", 100, None)]);
1114 let staged = ledger().stage_validated_generation(&active, None);
1115 let retirement = AllocationRetirement::new(
1116 "app.users.v1",
1117 AllocationSlotDescriptor::memory_manager(100),
1118 )
1119 .expect("retirement");
1120
1121 let staged = staged
1122 .stage_retirement_generation(&retirement, Some(42))
1123 .expect("retired generation");
1124 let record = &staged.allocation_history.records[0];
1125
1126 assert_eq!(staged.current_generation, 5);
1127 assert_eq!(record.state, AllocationState::Retired);
1128 assert_eq!(record.retired_generation, Some(5));
1129 assert_eq!(
1130 staged.allocation_history.generations[1].declaration_count,
1131 0
1132 );
1133 }
1134
1135 #[test]
1136 fn stage_retirement_generation_requires_matching_slot() {
1137 let active = validated(3, vec![declaration("app.users.v1", 100, None)]);
1138 let staged = ledger().stage_validated_generation(&active, None);
1139 let retirement = AllocationRetirement::new(
1140 "app.users.v1",
1141 AllocationSlotDescriptor::memory_manager(101),
1142 )
1143 .expect("retirement");
1144
1145 let err = staged
1146 .stage_retirement_generation(&retirement, None)
1147 .expect_err("slot mismatch");
1148
1149 assert!(matches!(
1150 err,
1151 AllocationRetirementError::SlotMismatch { .. }
1152 ));
1153 }
1154
1155 #[test]
1156 fn snapshot_can_feed_validated_generation() {
1157 let snapshot = DeclarationSnapshot::new(vec![declaration("app.users.v1", 100, None)])
1158 .expect("snapshot");
1159 let (declarations, runtime_fingerprint) = snapshot.into_parts();
1160 let validated =
1161 crate::session::ValidatedAllocations::new(3, declarations, runtime_fingerprint);
1162
1163 let staged = ledger().stage_validated_generation(&validated, None);
1164
1165 assert_eq!(staged.allocation_history.records.len(), 1);
1166 }
1167
1168 #[test]
1169 fn stage_validated_generation_records_runtime_fingerprint() {
1170 let validated = crate::session::ValidatedAllocations::new(
1171 3,
1172 vec![declaration("app.users.v1", 100, None)],
1173 Some("wasm:abc123".to_string()),
1174 );
1175
1176 let staged = ledger().stage_validated_generation(&validated, None);
1177
1178 assert_eq!(
1179 staged.allocation_history.generations[0].runtime_fingerprint,
1180 Some("wasm:abc123".to_string())
1181 );
1182 }
1183
1184 #[test]
1185 fn validate_integrity_rejects_duplicate_stable_keys() {
1186 let mut ledger = ledger();
1187 ledger.allocation_history.records = vec![
1188 active_record("app.users.v1", 100),
1189 active_record("app.users.v1", 101),
1190 ];
1191
1192 let err = ledger.validate_integrity().expect_err("duplicate key");
1193
1194 assert!(matches!(
1195 err,
1196 LedgerIntegrityError::DuplicateStableKey { .. }
1197 ));
1198 }
1199
1200 #[test]
1201 fn validate_integrity_rejects_duplicate_slots() {
1202 let mut ledger = ledger();
1203 ledger.allocation_history.records = vec![
1204 active_record("app.users.v1", 100),
1205 active_record("app.orders.v1", 100),
1206 ];
1207
1208 let err = ledger.validate_integrity().expect_err("duplicate slot");
1209
1210 assert!(matches!(err, LedgerIntegrityError::DuplicateSlot { .. }));
1211 }
1212
1213 #[test]
1214 fn validate_integrity_rejects_retired_record_without_retired_generation() {
1215 let mut ledger = ledger();
1216 let mut record = active_record("app.users.v1", 100);
1217 record.state = AllocationState::Retired;
1218 ledger.allocation_history.records = vec![record];
1219
1220 let err = ledger
1221 .validate_integrity()
1222 .expect_err("missing retired generation");
1223
1224 assert!(matches!(
1225 err,
1226 LedgerIntegrityError::MissingRetiredGeneration { .. }
1227 ));
1228 }
1229
1230 #[test]
1231 fn validate_integrity_rejects_non_retired_record_with_retired_generation() {
1232 let mut ledger = ledger();
1233 let mut record = active_record("app.users.v1", 100);
1234 record.retired_generation = Some(2);
1235 ledger.allocation_history.records = vec![record];
1236
1237 let err = ledger
1238 .validate_integrity()
1239 .expect_err("unexpected retired generation");
1240
1241 assert!(matches!(
1242 err,
1243 LedgerIntegrityError::UnexpectedRetiredGeneration { .. }
1244 ));
1245 }
1246
1247 #[test]
1248 fn validate_integrity_rejects_non_increasing_schema_history() {
1249 let mut ledger = ledger();
1250 let mut record = active_record("app.users.v1", 100);
1251 record.schema_history.push(SchemaMetadataRecord {
1252 generation: 1,
1253 schema: SchemaMetadata::default(),
1254 });
1255 ledger.allocation_history.records = vec![record];
1256
1257 let err = ledger
1258 .validate_integrity()
1259 .expect_err("non-increasing schema history");
1260
1261 assert!(matches!(
1262 err,
1263 LedgerIntegrityError::NonIncreasingSchemaHistory { .. }
1264 ));
1265 }
1266
1267 #[test]
1268 fn ledger_commit_store_rejects_invalid_ledger_before_write() {
1269 let mut store = LedgerCommitStore::default();
1270 let codec = TestCodec;
1271 let mut invalid = ledger();
1272 invalid.allocation_history.records = vec![
1273 active_record("app.users.v1", 100),
1274 active_record("app.orders.v1", 100),
1275 ];
1276
1277 let err = store.commit(&invalid, &codec).expect_err("invalid ledger");
1278
1279 assert!(matches!(
1280 err,
1281 LedgerCommitError::Integrity(LedgerIntegrityError::DuplicateSlot { .. })
1282 ));
1283 assert!(store.physical.is_uninitialized());
1284 }
1285
1286 #[test]
1287 fn ledger_commit_store_recovers_latest_committed_ledger() {
1288 let mut store = LedgerCommitStore::default();
1289 let codec = TestCodec;
1290 let first = AllocationLedger {
1291 current_generation: 1,
1292 ..ledger()
1293 };
1294 let second = AllocationLedger {
1295 current_generation: 2,
1296 ..ledger()
1297 };
1298
1299 store.commit(&first, &codec).expect("first commit");
1300 store.commit(&second, &codec).expect("second commit");
1301 let recovered = store.recover(&codec).expect("recovered ledger");
1302
1303 assert_eq!(recovered.current_generation, 2);
1304 }
1305
1306 #[test]
1307 fn ledger_commit_store_ignores_corrupt_inactive_ledger() {
1308 let mut store = LedgerCommitStore::default();
1309 let codec = TestCodec;
1310 let first = AllocationLedger {
1311 current_generation: 1,
1312 ..ledger()
1313 };
1314 let second = AllocationLedger {
1315 current_generation: 2,
1316 ..ledger()
1317 };
1318
1319 store.commit(&first, &codec).expect("first commit");
1320 store
1321 .write_corrupt_inactive_ledger(&second, &codec)
1322 .expect("corrupt write");
1323 let recovered = store.recover(&codec).expect("recovered ledger");
1324
1325 assert_eq!(recovered.current_generation, 1);
1326 }
1327
1328 #[test]
1329 fn ledger_commit_store_initializes_empty_store_explicitly() {
1330 let mut store = LedgerCommitStore::default();
1331 let codec = TestCodec;
1332 let genesis = ledger();
1333
1334 let recovered = store
1335 .recover_or_initialize(&codec, &genesis)
1336 .expect("initialized ledger");
1337
1338 assert_eq!(recovered.current_generation, 3);
1339 assert!(!store.physical.is_uninitialized());
1340 }
1341
1342 #[test]
1343 fn ledger_commit_store_rejects_corrupt_store_even_with_genesis() {
1344 let mut store = LedgerCommitStore::default();
1345 let codec = TestCodec;
1346 store
1347 .write_corrupt_inactive_ledger(&ledger(), &codec)
1348 .expect("corrupt write");
1349
1350 let err = store
1351 .recover_or_initialize(&codec, &ledger())
1352 .expect_err("corrupt state");
1353
1354 assert!(matches!(
1355 err,
1356 LedgerCommitError::Recovery(CommitRecoveryError::NoValidGeneration)
1357 ));
1358 }
1359
1360 #[test]
1361 fn ledger_commit_store_rejects_incompatible_schema_before_write() {
1362 let mut store = LedgerCommitStore::default();
1363 let codec = TestCodec;
1364 let incompatible = AllocationLedger {
1365 ledger_schema_version: CURRENT_LEDGER_SCHEMA_VERSION + 1,
1366 ..ledger()
1367 };
1368
1369 let err = store
1370 .commit(&incompatible, &codec)
1371 .expect_err("incompatible schema");
1372
1373 assert!(matches!(
1374 err,
1375 LedgerCommitError::Compatibility(
1376 LedgerCompatibilityError::UnsupportedLedgerSchemaVersion { .. }
1377 )
1378 ));
1379 assert!(store.physical.is_uninitialized());
1380 }
1381
1382 #[test]
1383 fn ledger_commit_store_rejects_incompatible_schema_on_recovery() {
1384 let mut store = LedgerCommitStore::default();
1385 let codec = TestCodec;
1386 let incompatible = AllocationLedger {
1387 ledger_schema_version: CURRENT_LEDGER_SCHEMA_VERSION + 1,
1388 ..ledger()
1389 };
1390 let payload = codec.encode(&incompatible).expect("payload");
1391 store
1392 .physical
1393 .commit_payload(payload)
1394 .expect("physical commit");
1395
1396 let err = store.recover(&codec).expect_err("incompatible schema");
1397
1398 assert!(matches!(
1399 err,
1400 LedgerCommitError::Compatibility(
1401 LedgerCompatibilityError::UnsupportedLedgerSchemaVersion { .. }
1402 )
1403 ));
1404 }
1405
1406 #[test]
1407 fn ledger_commit_store_rejects_incompatible_physical_format() {
1408 let mut store = LedgerCommitStore::default();
1409 let codec = TestCodec;
1410 let incompatible = AllocationLedger {
1411 physical_format_id: CURRENT_PHYSICAL_FORMAT_ID + 1,
1412 ..ledger()
1413 };
1414
1415 let err = store
1416 .recover_or_initialize(&codec, &incompatible)
1417 .expect_err("incompatible format");
1418
1419 assert!(matches!(
1420 err,
1421 LedgerCommitError::Compatibility(
1422 LedgerCompatibilityError::UnsupportedPhysicalFormat { .. }
1423 )
1424 ));
1425 assert!(store.physical.is_uninitialized());
1426 }
1427}