vox_core/
operation_store.rs1use std::collections::{BTreeMap, HashMap};
2
3use moire::sync::SyncMutex;
4use vox_types::{
5 MaybeSend, MaybeSync, OperationId, PostcardPayload, Schema, SchemaHash, SchemaRegistry,
6 SchemaSource, TypeRef,
7};
8
9pub struct SealedResponse {
11 pub response: PostcardPayload,
13 pub root_type: TypeRef,
15}
16
17pub enum OperationState {
19 Unknown,
21 Admitted,
23 Sealed,
25}
26
27pub trait OperationStore: MaybeSend + MaybeSync + 'static {
34 fn admit(&self, operation_id: OperationId);
36
37 fn lookup(&self, operation_id: OperationId) -> OperationState;
39
40 fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
42
43 fn seal(
48 &self,
49 operation_id: OperationId,
50 response: &PostcardPayload,
51 root_type: &TypeRef,
52 registry: &SchemaRegistry,
53 );
54
55 fn remove(&self, operation_id: OperationId);
57
58 fn schema_source(&self) -> &dyn SchemaSource;
60}
61
62enum InMemoryState {
67 Admitted,
68 Sealed {
69 response: PostcardPayload,
70 root_type: TypeRef,
71 },
72}
73
74pub struct InMemoryOperationStore {
76 inner: SyncMutex<InMemoryRegistry>,
77}
78
79#[derive(Default)]
80struct InMemoryRegistry {
81 operations: BTreeMap<OperationId, InMemoryState>,
82 schemas: HashMap<SchemaHash, Schema>,
83}
84
85impl InMemoryOperationStore {
86 pub fn new() -> Self {
87 Self::default()
88 }
89}
90
91impl Default for InMemoryOperationStore {
92 fn default() -> Self {
93 Self {
94 inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
95 }
96 }
97}
98
99impl SchemaSource for InMemoryOperationStore {
100 fn get_schema(&self, id: SchemaHash) -> Option<Schema> {
101 self.inner.lock().schemas.get(&id).cloned()
102 }
103}
104
105impl OperationStore for InMemoryOperationStore {
106 fn admit(&self, operation_id: OperationId) {
107 let mut inner = self.inner.lock();
108 inner
109 .operations
110 .entry(operation_id)
111 .or_insert(InMemoryState::Admitted);
112 }
113
114 fn lookup(&self, operation_id: OperationId) -> OperationState {
115 let inner = self.inner.lock();
116 match inner.operations.get(&operation_id) {
117 None => OperationState::Unknown,
118 Some(InMemoryState::Admitted) => OperationState::Admitted,
119 Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
120 }
121 }
122
123 fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
124 let inner = self.inner.lock();
125 match inner.operations.get(&operation_id) {
126 Some(InMemoryState::Sealed {
127 response,
128 root_type,
129 }) => Some(SealedResponse {
130 response: response.clone(),
131 root_type: root_type.clone(),
132 }),
133 _ => None,
134 }
135 }
136
137 fn seal(
138 &self,
139 operation_id: OperationId,
140 response: &PostcardPayload,
141 root_type: &TypeRef,
142 registry: &SchemaRegistry,
143 ) {
144 let mut inner = self.inner.lock();
145 let mut queue = Vec::new();
147 root_type.collect_ids(&mut queue);
148 let mut visited = std::collections::HashSet::new();
149 while let Some(id) = queue.pop() {
150 if !visited.insert(id) {
151 continue;
152 }
153 if inner.schemas.contains_key(&id) {
154 continue;
155 }
156 if let Some(schema) = registry.get(&id) {
157 for child_id in vox_types::schema_child_ids(&schema.kind) {
158 queue.push(child_id);
159 }
160 inner.schemas.insert(id, schema.clone());
161 }
162 }
163 inner.operations.insert(
164 operation_id,
165 InMemoryState::Sealed {
166 response: response.clone(),
167 root_type: root_type.clone(),
168 },
169 );
170 }
171
172 fn remove(&self, operation_id: OperationId) {
173 let mut inner = self.inner.lock();
174 if matches!(
175 inner.operations.get(&operation_id),
176 Some(InMemoryState::Admitted)
177 ) {
178 inner.operations.remove(&operation_id);
179 }
180 }
181
182 fn schema_source(&self) -> &dyn SchemaSource {
183 self
184 }
185}