Skip to main content

vox_core/
operation_store.rs

1use std::collections::{BTreeMap, HashMap};
2
3use moire::sync::SyncMutex;
4use vox_types::{
5    MaybeSend, MaybeSync, OperationId, PostcardPayload, Schema, SchemaHash, SchemaRegistry,
6    SchemaSource, TypeRef,
7};
8
9/// A sealed response stored in the operation store.
10pub struct SealedResponse {
11    /// Postcard-encoded response payload (without schemas).
12    pub response: PostcardPayload,
13    /// Root type ref for rebuilding the schema payload on replay.
14    pub root_type: TypeRef,
15}
16
17/// State of an operation in the store.
18pub enum OperationState {
19    /// Never seen this operation ID.
20    Unknown,
21    /// Operation was admitted but never sealed (crash/disconnect before completion).
22    Admitted,
23    /// Operation completed and response is available.
24    Sealed,
25}
26
27/// Operation state backing for exactly-once delivery across session resumption.
28///
29/// The default implementation is in-memory. Applications that want durability
30/// can implement this trait backed by a database.
31///
32/// Schemas are stored separately from payloads, deduplicated by SchemaHash.
33pub trait OperationStore: MaybeSend + MaybeSync + 'static {
34    /// Record that we're starting to process this operation.
35    fn admit(&self, operation_id: OperationId);
36
37    /// Check the state of an operation.
38    fn lookup(&self, operation_id: OperationId) -> OperationState;
39
40    /// Retrieve a sealed response.
41    fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
42
43    /// Store the sealed response for an operation.
44    ///
45    /// `response` is the postcard-encoded payload WITHOUT schemas.
46    /// The store pulls needed schemas from `registry`, deduplicated by SchemaHash.
47    fn seal(
48        &self,
49        operation_id: OperationId,
50        response: &PostcardPayload,
51        root_type: &TypeRef,
52        registry: &SchemaRegistry,
53    );
54
55    /// Remove an admitted (but not sealed) operation, e.g. after handler failure.
56    fn remove(&self, operation_id: OperationId);
57
58    /// Access the store's schema source for looking up schemas by hash.
59    fn schema_source(&self) -> &dyn SchemaSource;
60}
61
62// ============================================================================
63// In-memory implementation
64// ============================================================================
65
66enum InMemoryState {
67    Admitted,
68    Sealed {
69        response: PostcardPayload,
70        root_type: TypeRef,
71    },
72}
73
74/// Default in-memory operation store.
75pub struct InMemoryOperationStore {
76    inner: SyncMutex<InMemoryRegistry>,
77}
78
79#[derive(Default)]
80struct InMemoryRegistry {
81    operations: BTreeMap<OperationId, InMemoryState>,
82    schemas: HashMap<SchemaHash, Schema>,
83}
84
85impl InMemoryOperationStore {
86    pub fn new() -> Self {
87        Self::default()
88    }
89}
90
91impl Default for InMemoryOperationStore {
92    fn default() -> Self {
93        Self {
94            inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
95        }
96    }
97}
98
99impl SchemaSource for InMemoryOperationStore {
100    fn get_schema(&self, id: SchemaHash) -> Option<Schema> {
101        self.inner.lock().schemas.get(&id).cloned()
102    }
103}
104
105impl OperationStore for InMemoryOperationStore {
106    fn admit(&self, operation_id: OperationId) {
107        let mut inner = self.inner.lock();
108        inner
109            .operations
110            .entry(operation_id)
111            .or_insert(InMemoryState::Admitted);
112        tracing::trace!(
113            %operation_id,
114            operations = inner.operations.len(),
115            schemas = inner.schemas.len(),
116            "operation store admit"
117        );
118    }
119
120    fn lookup(&self, operation_id: OperationId) -> OperationState {
121        let inner = self.inner.lock();
122        match inner.operations.get(&operation_id) {
123            None => OperationState::Unknown,
124            Some(InMemoryState::Admitted) => OperationState::Admitted,
125            Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
126        }
127    }
128
129    fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
130        let inner = self.inner.lock();
131        match inner.operations.get(&operation_id) {
132            Some(InMemoryState::Sealed {
133                response,
134                root_type,
135            }) => Some(SealedResponse {
136                response: response.clone(),
137                root_type: root_type.clone(),
138            }),
139            _ => None,
140        }
141    }
142
143    fn seal(
144        &self,
145        operation_id: OperationId,
146        response: &PostcardPayload,
147        root_type: &TypeRef,
148        registry: &SchemaRegistry,
149    ) {
150        let mut inner = self.inner.lock();
151        // Store schemas the store doesn't have yet.
152        let mut queue = Vec::new();
153        root_type.collect_ids(&mut queue);
154        let mut visited = std::collections::HashSet::new();
155        while let Some(id) = queue.pop() {
156            if !visited.insert(id) {
157                continue;
158            }
159            if inner.schemas.contains_key(&id) {
160                continue;
161            }
162            if let Some(schema) = registry.get(&id) {
163                for child_id in vox_types::schema_child_ids(&schema.kind) {
164                    queue.push(child_id);
165                }
166                inner.schemas.insert(id, schema.clone());
167            }
168        }
169        inner.operations.insert(
170            operation_id,
171            InMemoryState::Sealed {
172                response: response.clone(),
173                root_type: root_type.clone(),
174            },
175        );
176        tracing::trace!(
177            %operation_id,
178            response_bytes = response.as_bytes().len(),
179            operations = inner.operations.len(),
180            schemas = inner.schemas.len(),
181            "operation store seal"
182        );
183    }
184
185    fn remove(&self, operation_id: OperationId) {
186        let mut inner = self.inner.lock();
187        if matches!(
188            inner.operations.get(&operation_id),
189            Some(InMemoryState::Admitted)
190        ) {
191            inner.operations.remove(&operation_id);
192            tracing::trace!(
193                %operation_id,
194                operations = inner.operations.len(),
195                schemas = inner.schemas.len(),
196                "operation store remove admitted"
197            );
198        }
199    }
200
201    fn schema_source(&self) -> &dyn SchemaSource {
202        self
203    }
204}