use std::collections::{BTreeMap, HashMap};
use moire::sync::SyncMutex;
use vox_types::{
MaybeSend, MaybeSync, OperationId, PostcardPayload, Schema, SchemaHash, SchemaRegistry,
SchemaSource, TypeRef,
};
pub struct SealedResponse {
pub response: PostcardPayload,
pub root_type: TypeRef,
}
pub enum OperationState {
Unknown,
Admitted,
Sealed,
}
pub trait OperationStore: MaybeSend + MaybeSync + 'static {
fn admit(&self, operation_id: OperationId);
fn lookup(&self, operation_id: OperationId) -> OperationState;
fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse>;
fn seal(
&self,
operation_id: OperationId,
response: &PostcardPayload,
root_type: &TypeRef,
registry: &SchemaRegistry,
);
fn remove(&self, operation_id: OperationId);
fn schema_source(&self) -> &dyn SchemaSource;
}
enum InMemoryState {
Admitted,
Sealed {
response: PostcardPayload,
root_type: TypeRef,
},
}
pub struct InMemoryOperationStore {
inner: SyncMutex<InMemoryRegistry>,
}
#[derive(Default)]
struct InMemoryRegistry {
operations: BTreeMap<OperationId, InMemoryState>,
schemas: HashMap<SchemaHash, Schema>,
}
impl InMemoryOperationStore {
pub fn new() -> Self {
Self::default()
}
}
impl Default for InMemoryOperationStore {
fn default() -> Self {
Self {
inner: SyncMutex::new("driver.operations", InMemoryRegistry::default()),
}
}
}
impl SchemaSource for InMemoryOperationStore {
fn get_schema(&self, id: SchemaHash) -> Option<Schema> {
self.inner.lock().schemas.get(&id).cloned()
}
}
impl OperationStore for InMemoryOperationStore {
fn admit(&self, operation_id: OperationId) {
let mut inner = self.inner.lock();
inner
.operations
.entry(operation_id)
.or_insert(InMemoryState::Admitted);
tracing::trace!(
%operation_id,
operations = inner.operations.len(),
schemas = inner.schemas.len(),
"operation store admit"
);
}
fn lookup(&self, operation_id: OperationId) -> OperationState {
let inner = self.inner.lock();
match inner.operations.get(&operation_id) {
None => OperationState::Unknown,
Some(InMemoryState::Admitted) => OperationState::Admitted,
Some(InMemoryState::Sealed { .. }) => OperationState::Sealed,
}
}
fn get_sealed(&self, operation_id: OperationId) -> Option<SealedResponse> {
let inner = self.inner.lock();
match inner.operations.get(&operation_id) {
Some(InMemoryState::Sealed {
response,
root_type,
}) => Some(SealedResponse {
response: response.clone(),
root_type: root_type.clone(),
}),
_ => None,
}
}
fn seal(
&self,
operation_id: OperationId,
response: &PostcardPayload,
root_type: &TypeRef,
registry: &SchemaRegistry,
) {
let mut inner = self.inner.lock();
let mut queue = Vec::new();
root_type.collect_ids(&mut queue);
let mut visited = std::collections::HashSet::new();
while let Some(id) = queue.pop() {
if !visited.insert(id) {
continue;
}
if inner.schemas.contains_key(&id) {
continue;
}
if let Some(schema) = registry.get(&id) {
for child_id in vox_types::schema_child_ids(&schema.kind) {
queue.push(child_id);
}
inner.schemas.insert(id, schema.clone());
}
}
inner.operations.insert(
operation_id,
InMemoryState::Sealed {
response: response.clone(),
root_type: root_type.clone(),
},
);
tracing::trace!(
%operation_id,
response_bytes = response.as_bytes().len(),
operations = inner.operations.len(),
schemas = inner.schemas.len(),
"operation store seal"
);
}
fn remove(&self, operation_id: OperationId) {
let mut inner = self.inner.lock();
if matches!(
inner.operations.get(&operation_id),
Some(InMemoryState::Admitted)
) {
inner.operations.remove(&operation_id);
tracing::trace!(
%operation_id,
operations = inner.operations.len(),
schemas = inner.schemas.len(),
"operation store remove admitted"
);
}
}
fn schema_source(&self) -> &dyn SchemaSource {
self
}
}