Skip to main content

vox_core/
operation_store.rs

1use std::collections::{BTreeMap, HashMap};
2
3use moire::sync::SyncMutex;
4use vox_types::{
5    MaybeSend, MaybeSync, OperationId, PostcardPayload, Schema, SchemaHash, SchemaRegistry,
6    SchemaSource, TypeRef,
7};
8
9/// A sealed response stored in the operation store.
10pub struct SealedResponse {
11    /// Postcard-encoded response payload (without schemas).
12    pub response: PostcardPayload,
13    /// Root type ref for rebuilding the schema payload on replay.
14    pub root_type: TypeRef,
15}
16
17/// State of an operation in the store.
18pub enum OperationState {
19    /// Never seen this operation ID.
20    Unknown,
21    /// Operation was admitted but never sealed (crash/disconnect before completion).
22    Admitted,
23    /// Operation completed and response is available.
24    Sealed,
25}
26
27/// Operation state backing for exactly-once delivery across session resumption.
28///
29/// The default implementation is in-memory. Applications that want durability
30/// can implement this trait backed by a database.
31///
32/// Schemas are stored separately from payloads, deduplicated by SchemaHash.
33pub trait OperationStore: MaybeSend + MaybeSync + 'static {
34    /// Record that we're starting to process this operation.
35    fn admit(&self, operation_id: OperationId);
36
37    /// Check the state of an operation.
38    fn lookup(&self, operation_id: OperationId) -> OperationState;
39
40    /// Retrieve a sealed response.
41    fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
42
43    /// Store the sealed response for an operation.
44    ///
45    /// `response` is the postcard-encoded payload WITHOUT schemas.
46    /// The store pulls needed schemas from `registry`, deduplicated by SchemaHash.
47    fn seal(
48        &self,
49        operation_id: OperationId,
50        response: &PostcardPayload,
51        root_type: &TypeRef,
52        registry: &SchemaRegistry,
53    );
54
55    /// Remove an admitted (but not sealed) operation, e.g. after handler failure.
56    fn remove(&self, operation_id: OperationId);
57
58    /// Access the store's schema source for looking up schemas by hash.
59    fn schema_source(&self) -> &dyn SchemaSource;
60}
61
62// ============================================================================
63// In-memory implementation
64// ============================================================================
65
66enum InMemoryState {
67    Admitted,
68    Sealed {
69        response: PostcardPayload,
70        root_type: TypeRef,
71    },
72}
73
74/// Default in-memory operation store.
75pub struct InMemoryOperationStore {
76    inner: SyncMutex<InMemoryRegistry>,
77}
78
79#[derive(Default)]
80struct InMemoryRegistry {
81    operations: BTreeMap<OperationId, InMemoryState>,
82    schemas: HashMap<SchemaHash, Schema>,
83}
84
85impl InMemoryOperationStore {
86    pub fn new() -> Self {
87        Self::default()
88    }
89}
90
91impl Default for InMemoryOperationStore {
92    fn default() -> Self {
93        Self {
94            inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
95        }
96    }
97}
98
99impl SchemaSource for InMemoryOperationStore {
100    fn get_schema(&self, id: SchemaHash) -> Option<Schema> {
101        self.inner.lock().schemas.get(&id).cloned()
102    }
103}
104
105impl OperationStore for InMemoryOperationStore {
106    fn admit(&self, operation_id: OperationId) {
107        let mut inner = self.inner.lock();
108        inner
109            .operations
110            .entry(operation_id)
111            .or_insert(InMemoryState::Admitted);
112    }
113
114    fn lookup(&self, operation_id: OperationId) -> OperationState {
115        let inner = self.inner.lock();
116        match inner.operations.get(&operation_id) {
117            None => OperationState::Unknown,
118            Some(InMemoryState::Admitted) => OperationState::Admitted,
119            Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
120        }
121    }
122
123    fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
124        let inner = self.inner.lock();
125        match inner.operations.get(&operation_id) {
126            Some(InMemoryState::Sealed {
127                response,
128                root_type,
129            }) => Some(SealedResponse {
130                response: response.clone(),
131                root_type: root_type.clone(),
132            }),
133            _ => None,
134        }
135    }
136
137    fn seal(
138        &self,
139        operation_id: OperationId,
140        response: &PostcardPayload,
141        root_type: &TypeRef,
142        registry: &SchemaRegistry,
143    ) {
144        let mut inner = self.inner.lock();
145        // Store schemas the store doesn't have yet.
146        let mut queue = Vec::new();
147        root_type.collect_ids(&mut queue);
148        let mut visited = std::collections::HashSet::new();
149        while let Some(id) = queue.pop() {
150            if !visited.insert(id) {
151                continue;
152            }
153            if inner.schemas.contains_key(&id) {
154                continue;
155            }
156            if let Some(schema) = registry.get(&id) {
157                for child_id in vox_types::schema_child_ids(&schema.kind) {
158                    queue.push(child_id);
159                }
160                inner.schemas.insert(id, schema.clone());
161            }
162        }
163        inner.operations.insert(
164            operation_id,
165            InMemoryState::Sealed {
166                response: response.clone(),
167                root_type: root_type.clone(),
168            },
169        );
170    }
171
172    fn remove(&self, operation_id: OperationId) {
173        let mut inner = self.inner.lock();
174        if matches!(
175            inner.operations.get(&operation_id),
176            Some(InMemoryState::Admitted)
177        ) {
178            inner.operations.remove(&operation_id);
179        }
180    }
181
182    fn schema_source(&self) -> &dyn SchemaSource {
183        self
184    }
185}