vox_core/
operation_store.rs1use std::collections::{BTreeMap, HashMap};
2
3use moire::sync::SyncMutex;
4use vox_types::{
5 MaybeSend, MaybeSync, OperationId, PostcardPayload, Schema, SchemaHash, SchemaRegistry,
6 SchemaSource, TypeRef,
7};
8
9pub struct SealedResponse {
11 pub response: PostcardPayload,
13 pub root_type: TypeRef,
15}
16
17pub enum OperationState {
19 Unknown,
21 Admitted,
23 Sealed,
25}
26
27pub trait OperationStore: MaybeSend + MaybeSync + 'static {
34 fn admit(&self, operation_id: OperationId);
36
37 fn lookup(&self, operation_id: OperationId) -> OperationState;
39
40 fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
42
43 fn seal(
48 &self,
49 operation_id: OperationId,
50 response: &PostcardPayload,
51 root_type: &TypeRef,
52 registry: &SchemaRegistry,
53 );
54
55 fn remove(&self, operation_id: OperationId);
57
58 fn schema_source(&self) -> &dyn SchemaSource;
60}
61
62enum InMemoryState {
67 Admitted,
68 Sealed {
69 response: PostcardPayload,
70 root_type: TypeRef,
71 },
72}
73
74pub struct InMemoryOperationStore {
76 inner: SyncMutex<InMemoryRegistry>,
77}
78
79#[derive(Default)]
80struct InMemoryRegistry {
81 operations: BTreeMap<OperationId, InMemoryState>,
82 schemas: HashMap<SchemaHash, Schema>,
83}
84
85impl InMemoryOperationStore {
86 pub fn new() -> Self {
87 Self::default()
88 }
89}
90
91impl Default for InMemoryOperationStore {
92 fn default() -> Self {
93 Self {
94 inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
95 }
96 }
97}
98
99impl SchemaSource for InMemoryOperationStore {
100 fn get_schema(&self, id: SchemaHash) -> Option<Schema> {
101 self.inner.lock().schemas.get(&id).cloned()
102 }
103}
104
105impl OperationStore for InMemoryOperationStore {
106 fn admit(&self, operation_id: OperationId) {
107 let mut inner = self.inner.lock();
108 inner
109 .operations
110 .entry(operation_id)
111 .or_insert(InMemoryState::Admitted);
112 tracing::trace!(
113 %operation_id,
114 operations = inner.operations.len(),
115 schemas = inner.schemas.len(),
116 "operation store admit"
117 );
118 }
119
120 fn lookup(&self, operation_id: OperationId) -> OperationState {
121 let inner = self.inner.lock();
122 match inner.operations.get(&operation_id) {
123 None => OperationState::Unknown,
124 Some(InMemoryState::Admitted) => OperationState::Admitted,
125 Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
126 }
127 }
128
129 fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
130 let inner = self.inner.lock();
131 match inner.operations.get(&operation_id) {
132 Some(InMemoryState::Sealed {
133 response,
134 root_type,
135 }) => Some(SealedResponse {
136 response: response.clone(),
137 root_type: root_type.clone(),
138 }),
139 _ => None,
140 }
141 }
142
143 fn seal(
144 &self,
145 operation_id: OperationId,
146 response: &PostcardPayload,
147 root_type: &TypeRef,
148 registry: &SchemaRegistry,
149 ) {
150 let mut inner = self.inner.lock();
151 let mut queue = Vec::new();
153 root_type.collect_ids(&mut queue);
154 let mut visited = std::collections::HashSet::new();
155 while let Some(id) = queue.pop() {
156 if !visited.insert(id) {
157 continue;
158 }
159 if inner.schemas.contains_key(&id) {
160 continue;
161 }
162 if let Some(schema) = registry.get(&id) {
163 for child_id in vox_types::schema_child_ids(&schema.kind) {
164 queue.push(child_id);
165 }
166 inner.schemas.insert(id, schema.clone());
167 }
168 }
169 inner.operations.insert(
170 operation_id,
171 InMemoryState::Sealed {
172 response: response.clone(),
173 root_type: root_type.clone(),
174 },
175 );
176 tracing::trace!(
177 %operation_id,
178 response_bytes = response.as_bytes().len(),
179 operations = inner.operations.len(),
180 schemas = inner.schemas.len(),
181 "operation store seal"
182 );
183 }
184
185 fn remove(&self, operation_id: OperationId) {
186 let mut inner = self.inner.lock();
187 if matches!(
188 inner.operations.get(&operation_id),
189 Some(InMemoryState::Admitted)
190 ) {
191 inner.operations.remove(&operation_id);
192 tracing::trace!(
193 %operation_id,
194 operations = inner.operations.len(),
195 schemas = inner.schemas.len(),
196 "operation store remove admitted"
197 );
198 }
199 }
200
201 fn schema_source(&self) -> &dyn SchemaSource {
202 self
203 }
204}