Skip to main content

vox_core/
operation_store.rs

1use std::collections::BTreeMap;
2
3use moire::sync::SyncMutex;
4use vox_types::{MaybeSend, MaybeSync, MethodId, OperationId, PostcardPayload};
5
6/// A sealed response stored in the operation store.
7///
8/// The store is in-process and same-version: the running code's static
9/// `SHAPE` for `method_id` is the source of truth for the response's
10/// schemas, both at admission time and at replay time. The store does
11/// not freeze schemas alongside payloads — that would only matter for
12/// cross-process / cross-version replay, and we don't promise that.
13pub struct SealedResponse {
14    /// Postcard-encoded response payload (without schemas).
15    pub response: PostcardPayload,
16    /// Method this response was produced for. Replay derives the
17    /// response's static `&'static Shape` from this.
18    pub method_id: MethodId,
19}
20
21/// State of an operation in the store.
22pub enum OperationState {
23    /// Never seen this operation ID.
24    Unknown,
25    /// Operation was admitted but never sealed (crash/disconnect before completion).
26    Admitted,
27    /// Operation completed and response is available.
28    Sealed,
29}
30
31/// Operation state backing for exactly-once delivery across session
32/// resumption within a single process.
33///
34/// The default implementation is in-memory. Schemas are NOT stored —
35/// they come from the running code on replay. Cross-process /
36/// cross-version durability is the responsibility of `persist` methods,
37/// which would require a separate store implementation that grapples
38/// with schema migration; that contract is out of scope here.
39pub trait OperationStore: MaybeSend + MaybeSync + 'static {
40    /// Record that we're starting to process this operation.
41    fn admit(&self, operation_id: OperationId);
42
43    /// Check the state of an operation.
44    fn lookup(&self, operation_id: OperationId) -> OperationState;
45
46    /// Retrieve a sealed response.
47    fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
48
49    /// Store the sealed response for an operation. `response` is the
50    /// postcard-encoded payload without schemas; `method_id` is needed
51    /// at replay time to look up the response shape from the running
52    /// code.
53    fn seal(&self, operation_id: OperationId, method_id: MethodId, response: &PostcardPayload);
54
55    /// Remove an admitted (but not sealed) operation, e.g. after handler failure.
56    fn remove(&self, operation_id: OperationId);
57}
58
59// ============================================================================
60// In-memory implementation
61// ============================================================================
62
63enum InMemoryState {
64    Admitted,
65    Sealed {
66        response: PostcardPayload,
67        method_id: MethodId,
68    },
69}
70
71/// Default in-memory operation store.
72pub struct InMemoryOperationStore {
73    inner: SyncMutex<InMemoryRegistry>,
74}
75
76#[derive(Default)]
77struct InMemoryRegistry {
78    operations: BTreeMap<OperationId, InMemoryState>,
79}
80
81impl InMemoryOperationStore {
82    pub fn new() -> Self {
83        Self::default()
84    }
85}
86
87impl Default for InMemoryOperationStore {
88    fn default() -> Self {
89        Self {
90            inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
91        }
92    }
93}
94
95impl OperationStore for InMemoryOperationStore {
96    fn admit(&self, operation_id: OperationId) {
97        let mut inner = self.inner.lock();
98        inner
99            .operations
100            .entry(operation_id)
101            .or_insert(InMemoryState::Admitted);
102        tracing::trace!(
103            %operation_id,
104            operations = inner.operations.len(),
105            "operation store admit"
106        );
107    }
108
109    fn lookup(&self, operation_id: OperationId) -> OperationState {
110        let inner = self.inner.lock();
111        match inner.operations.get(&operation_id) {
112            None => OperationState::Unknown,
113            Some(InMemoryState::Admitted) => OperationState::Admitted,
114            Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
115        }
116    }
117
118    fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
119        let inner = self.inner.lock();
120        match inner.operations.get(&operation_id) {
121            Some(InMemoryState::Sealed {
122                response,
123                method_id,
124            }) => Some(SealedResponse {
125                response: response.clone(),
126                method_id: *method_id,
127            }),
128            _ => None,
129        }
130    }
131
132    fn seal(&self, operation_id: OperationId, method_id: MethodId, response: &PostcardPayload) {
133        let mut inner = self.inner.lock();
134        inner.operations.insert(
135            operation_id,
136            InMemoryState::Sealed {
137                response: response.clone(),
138                method_id,
139            },
140        );
141        tracing::trace!(
142            %operation_id,
143            response_bytes = response.as_bytes().len(),
144            operations = inner.operations.len(),
145            "operation store seal"
146        );
147    }
148
149    fn remove(&self, operation_id: OperationId) {
150        let mut inner = self.inner.lock();
151        if matches!(
152            inner.operations.get(&operation_id),
153            Some(InMemoryState::Admitted)
154        ) {
155            inner.operations.remove(&operation_id);
156            tracing::trace!(
157                %operation_id,
158                operations = inner.operations.len(),
159                "operation store remove admitted"
160            );
161        }
162    }
163}