pub mod memory;
#[cfg(feature = "redb-store")]
pub mod redb;
#[cfg(feature = "sqlite-store")]
pub mod sqlite;
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
use sha2::{Digest, Sha256};
use crate::identifiers::LogicalRuntimeId;
use crate::input_state::InputState;
use crate::runtime_state::RuntimeState;
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum RuntimeStoreError {
#[error("Store write failed: {0}")]
WriteFailed(String),
#[error("Store read failed: {0}")]
ReadFailed(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Internal error: {0}")]
Internal(String),
}
#[derive(Debug, Clone)]
pub struct SessionDelta {
pub session_snapshot: Vec<u8>,
}
fn authoritative_receipt(
session_delta: Option<&SessionDelta>,
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
sequence: u64,
) -> Result<RunBoundaryReceipt, RuntimeStoreError> {
let (conversation_digest, message_count) = match session_delta {
Some(delta) => {
let session: meerkat_core::Session = serde_json::from_slice(&delta.session_snapshot)
.map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
let encoded_messages = serde_json::to_vec(session.messages())
.map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
(
Some(format!("{:x}", Sha256::digest(encoded_messages))),
session.messages().len(),
)
}
None => (None, 0),
};
Ok(RunBoundaryReceipt {
run_id,
boundary,
contributing_input_ids,
conversation_digest,
message_count,
sequence,
})
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait RuntimeStore: Send + Sync {
async fn commit_session_boundary(
&self,
runtime_id: &LogicalRuntimeId,
session_delta: SessionDelta,
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
input_updates: Vec<InputState>,
) -> Result<RunBoundaryReceipt, RuntimeStoreError>;
async fn atomic_apply(
&self,
runtime_id: &LogicalRuntimeId,
session_delta: Option<SessionDelta>,
receipt: RunBoundaryReceipt,
input_updates: Vec<InputState>,
session_store_key: Option<meerkat_core::types::SessionId>,
) -> Result<(), RuntimeStoreError>;
async fn load_input_states(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Vec<InputState>, RuntimeStoreError>;
async fn load_boundary_receipt(
&self,
runtime_id: &LogicalRuntimeId,
run_id: &RunId,
sequence: u64,
) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
async fn load_session_snapshot(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
async fn persist_input_state(
&self,
runtime_id: &LogicalRuntimeId,
state: &InputState,
) -> Result<(), RuntimeStoreError>;
async fn load_input_state(
&self,
runtime_id: &LogicalRuntimeId,
input_id: &InputId,
) -> Result<Option<InputState>, RuntimeStoreError>;
async fn persist_runtime_state(
&self,
runtime_id: &LogicalRuntimeId,
state: RuntimeState,
) -> Result<(), RuntimeStoreError>;
async fn load_runtime_state(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<RuntimeState>, RuntimeStoreError>;
async fn atomic_lifecycle_commit(
&self,
runtime_id: &LogicalRuntimeId,
runtime_state: RuntimeState,
input_states: &[InputState],
) -> Result<(), RuntimeStoreError>;
async fn persist_ops_lifecycle(
&self,
runtime_id: &LogicalRuntimeId,
snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
) -> Result<(), RuntimeStoreError> {
let _ = (runtime_id, snapshot);
Ok(())
}
async fn load_ops_lifecycle(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
let _ = runtime_id;
Ok(None)
}
}
pub use memory::InMemoryRuntimeStore;
#[cfg(feature = "redb-store")]
pub use redb::RedbRuntimeStore;
#[cfg(feature = "sqlite-store")]
pub use sqlite::SqliteRuntimeStore;