vox_core/
operation_store.rs1use std::collections::BTreeMap;
2
3use moire::sync::SyncMutex;
4use vox_types::{MaybeSend, MaybeSync, MethodId, OperationId, PostcardPayload};
5
6pub struct SealedResponse {
14 pub response: PostcardPayload,
16 pub method_id: MethodId,
19}
20
21pub enum OperationState {
23 Unknown,
25 Admitted,
27 Sealed,
29}
30
31pub trait OperationStore: MaybeSend + MaybeSync + 'static {
40 fn admit(&self, operation_id: OperationId);
42
43 fn lookup(&self, operation_id: OperationId) -> OperationState;
45
46 fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
48
49 fn seal(&self, operation_id: OperationId, method_id: MethodId, response: &PostcardPayload);
54
55 fn remove(&self, operation_id: OperationId);
57}
58
59enum InMemoryState {
64 Admitted,
65 Sealed {
66 response: PostcardPayload,
67 method_id: MethodId,
68 },
69}
70
71pub struct InMemoryOperationStore {
73 inner: SyncMutex<InMemoryRegistry>,
74}
75
76#[derive(Default)]
77struct InMemoryRegistry {
78 operations: BTreeMap<OperationId, InMemoryState>,
79}
80
81impl InMemoryOperationStore {
82 pub fn new() -> Self {
83 Self::default()
84 }
85}
86
87impl Default for InMemoryOperationStore {
88 fn default() -> Self {
89 Self {
90 inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
91 }
92 }
93}
94
95impl OperationStore for InMemoryOperationStore {
96 fn admit(&self, operation_id: OperationId) {
97 let mut inner = self.inner.lock();
98 inner
99 .operations
100 .entry(operation_id)
101 .or_insert(InMemoryState::Admitted);
102 tracing::trace!(
103 %operation_id,
104 operations = inner.operations.len(),
105 "operation store admit"
106 );
107 }
108
109 fn lookup(&self, operation_id: OperationId) -> OperationState {
110 let inner = self.inner.lock();
111 match inner.operations.get(&operation_id) {
112 None => OperationState::Unknown,
113 Some(InMemoryState::Admitted) => OperationState::Admitted,
114 Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
115 }
116 }
117
118 fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
119 let inner = self.inner.lock();
120 match inner.operations.get(&operation_id) {
121 Some(InMemoryState::Sealed {
122 response,
123 method_id,
124 }) => Some(SealedResponse {
125 response: response.clone(),
126 method_id: *method_id,
127 }),
128 _ => None,
129 }
130 }
131
132 fn seal(&self, operation_id: OperationId, method_id: MethodId, response: &PostcardPayload) {
133 let mut inner = self.inner.lock();
134 inner.operations.insert(
135 operation_id,
136 InMemoryState::Sealed {
137 response: response.clone(),
138 method_id,
139 },
140 );
141 tracing::trace!(
142 %operation_id,
143 response_bytes = response.as_bytes().len(),
144 operations = inner.operations.len(),
145 "operation store seal"
146 );
147 }
148
149 fn remove(&self, operation_id: OperationId) {
150 let mut inner = self.inner.lock();
151 if matches!(
152 inner.operations.get(&operation_id),
153 Some(InMemoryState::Admitted)
154 ) {
155 inner.operations.remove(&operation_id);
156 tracing::trace!(
157 %operation_id,
158 operations = inner.operations.len(),
159 "operation store remove admitted"
160 );
161 }
162 }
163}