pub mod memory;
#[cfg(feature = "sqlite-store")]
pub mod sqlite;
use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
use crate::identifiers::LogicalRuntimeId;
use crate::input_state::{InputStatePersistenceRecord, StoredInputState};
use crate::runtime_state::RuntimeState;
const MACHINE_LIFECYCLE_STORE_RECORD_VERSION: u16 = 1;
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum RuntimeStoreError {
#[error("Store write failed: {0}")]
WriteFailed(String),
#[error("Store read failed: {0}")]
ReadFailed(String),
#[error("Session store key mismatch: expected {expected}, actual {actual}")]
SessionKeyMismatch {
expected: meerkat_core::types::SessionId,
actual: meerkat_core::types::SessionId,
},
#[error("Not found: {0}")]
NotFound(String),
#[error("Unsupported store operation: {0}")]
Unsupported(String),
#[error("Transcript revision conflict: expected {expected}, actual {actual}")]
TranscriptRevisionConflict { expected: String, actual: String },
#[error("Internal error: {0}")]
Internal(String),
}
pub type AuthOAuthFlowSnapshotUpdate<'a> =
dyn FnMut(Option<&[u8]>) -> Result<Vec<u8>, RuntimeStoreError> + 'a;
#[derive(Debug, Clone)]
pub struct SessionDelta {
pub session_snapshot: Vec<u8>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MachineLifecycleBindingFacts {
agent_runtime_id: Option<String>,
fence_token: Option<u64>,
runtime_generation: Option<u64>,
runtime_epoch_id: Option<String>,
}
impl MachineLifecycleBindingFacts {
pub(crate) fn new(
agent_runtime_id: Option<String>,
fence_token: Option<u64>,
runtime_generation: Option<u64>,
runtime_epoch_id: Option<String>,
) -> Self {
Self {
agent_runtime_id,
fence_token,
runtime_generation,
runtime_epoch_id,
}
}
pub fn agent_runtime_id(&self) -> Option<&str> {
self.agent_runtime_id.as_deref()
}
pub fn fence_token(&self) -> Option<u64> {
self.fence_token
}
pub fn runtime_generation(&self) -> Option<u64> {
self.runtime_generation
}
pub fn runtime_epoch_id(&self) -> Option<&str> {
self.runtime_epoch_id.as_deref()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MachineLifecycleSnapshot {
runtime_state: RuntimeState,
binding: MachineLifecycleBindingFacts,
}
impl MachineLifecycleSnapshot {
pub(crate) fn new(runtime_state: RuntimeState, binding: MachineLifecycleBindingFacts) -> Self {
Self {
runtime_state,
binding,
}
}
pub fn runtime_state(&self) -> RuntimeState {
self.runtime_state
}
pub fn binding(&self) -> &MachineLifecycleBindingFacts {
&self.binding
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct MachineLifecycleBindingFactsStoreWire {
agent_runtime_id: Option<String>,
fence_token: Option<u64>,
runtime_generation: Option<u64>,
runtime_epoch_id: Option<String>,
}
impl From<&MachineLifecycleBindingFacts> for MachineLifecycleBindingFactsStoreWire {
fn from(binding: &MachineLifecycleBindingFacts) -> Self {
Self {
agent_runtime_id: binding.agent_runtime_id().map(ToOwned::to_owned),
fence_token: binding.fence_token(),
runtime_generation: binding.runtime_generation(),
runtime_epoch_id: binding.runtime_epoch_id().map(ToOwned::to_owned),
}
}
}
impl From<MachineLifecycleBindingFactsStoreWire> for MachineLifecycleBindingFacts {
fn from(binding: MachineLifecycleBindingFactsStoreWire) -> Self {
Self::new(
binding.agent_runtime_id,
binding.fence_token,
binding.runtime_generation,
binding.runtime_epoch_id,
)
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct MachineLifecycleSnapshotStoreWire {
record_version: u16,
runtime_state: RuntimeState,
binding: MachineLifecycleBindingFactsStoreWire,
}
impl From<&MachineLifecycleSnapshot> for MachineLifecycleSnapshotStoreWire {
fn from(snapshot: &MachineLifecycleSnapshot) -> Self {
Self {
record_version: MACHINE_LIFECYCLE_STORE_RECORD_VERSION,
runtime_state: snapshot.runtime_state(),
binding: snapshot.binding().into(),
}
}
}
impl TryFrom<MachineLifecycleSnapshotStoreWire> for MachineLifecycleSnapshot {
type Error = RuntimeStoreError;
fn try_from(record: MachineLifecycleSnapshotStoreWire) -> Result<Self, Self::Error> {
if record.record_version != MACHINE_LIFECYCLE_STORE_RECORD_VERSION {
return Err(RuntimeStoreError::ReadFailed(format!(
"unsupported machine lifecycle store record version {}",
record.record_version
)));
}
Ok(Self::new(record.runtime_state, record.binding.into()))
}
}
fn decode_machine_lifecycle_store_record(
bytes: &[u8],
) -> Result<MachineLifecycleSnapshot, RuntimeStoreError> {
let record = serde_json::from_slice::<MachineLifecycleSnapshotStoreWire>(bytes)
.map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))?;
MachineLifecycleSnapshot::try_from(record)
}
pub async fn load_runtime_state(
store: &dyn RuntimeStore,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<RuntimeState>, RuntimeStoreError> {
Ok(load_machine_lifecycle(store, runtime_id)
.await?
.map(|snapshot| snapshot.runtime_state()))
}
pub(crate) async fn load_machine_lifecycle(
store: &dyn RuntimeStore,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<MachineLifecycleSnapshot>, RuntimeStoreError> {
store
.load_machine_lifecycle_record(runtime_id)
.await?
.map(|bytes| decode_machine_lifecycle_store_record(&bytes))
.transpose()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MachineLifecycleStoreRecord {
snapshot: MachineLifecycleSnapshot,
}
impl MachineLifecycleStoreRecord {
pub(crate) fn from_snapshot(snapshot: &MachineLifecycleSnapshot) -> Self {
Self {
snapshot: snapshot.clone(),
}
}
pub fn encode(&self) -> Result<Vec<u8>, RuntimeStoreError> {
let wire = MachineLifecycleSnapshotStoreWire::from(&self.snapshot);
serde_json::to_vec(&wire).map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MachineLifecycleCommit {
snapshot: MachineLifecycleSnapshot,
}
impl MachineLifecycleCommit {
pub(crate) fn new_with_binding(
runtime_state: RuntimeState,
binding: MachineLifecycleBindingFacts,
) -> Self {
Self {
snapshot: MachineLifecycleSnapshot::new(runtime_state, binding),
}
}
pub fn runtime_state(&self) -> RuntimeState {
self.snapshot.runtime_state()
}
pub fn snapshot(&self) -> &MachineLifecycleSnapshot {
&self.snapshot
}
pub fn store_record(&self) -> MachineLifecycleStoreRecord {
MachineLifecycleStoreRecord::from_snapshot(&self.snapshot)
}
pub(crate) fn into_snapshot(self) -> MachineLifecycleSnapshot {
self.snapshot
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait RuntimeStore: Send + Sync {
fn auth_authority_key(&self) -> Option<String> {
None
}
fn persist_auth_oauth_flow_snapshot(
&self,
snapshot_json: &[u8],
) -> Result<(), RuntimeStoreError> {
let _ = snapshot_json;
Err(RuntimeStoreError::Unsupported(
"persist_auth_oauth_flow_snapshot".into(),
))
}
fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
Err(RuntimeStoreError::Unsupported(
"load_auth_oauth_flow_snapshot".into(),
))
}
fn update_auth_oauth_flow_snapshot(
&self,
_update: &mut AuthOAuthFlowSnapshotUpdate<'_>,
) -> Result<(), RuntimeStoreError> {
Err(RuntimeStoreError::Unsupported(
"update_auth_oauth_flow_snapshot".into(),
))
}
async fn commit_session_snapshot(
&self,
runtime_id: &LogicalRuntimeId,
session_delta: SessionDelta,
) -> Result<(), RuntimeStoreError>;
async fn commit_session_transcript_rewrite_snapshot(
&self,
runtime_id: &LogicalRuntimeId,
session_delta: SessionDelta,
commit: &meerkat_core::TranscriptRewriteCommit,
) -> Result<(), RuntimeStoreError> {
let _ = (runtime_id, session_delta, commit);
Err(RuntimeStoreError::Unsupported(
"commit_session_transcript_rewrite_snapshot".into(),
))
}
async fn atomic_apply(
&self,
runtime_id: &LogicalRuntimeId,
session_delta: Option<SessionDelta>,
receipt: RunBoundaryReceipt,
input_updates: Vec<InputStatePersistenceRecord>,
session_store_key: Option<meerkat_core::types::SessionId>,
) -> Result<(), RuntimeStoreError>;
async fn load_input_states(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Vec<StoredInputState>, RuntimeStoreError>;
async fn load_boundary_receipt(
&self,
runtime_id: &LogicalRuntimeId,
run_id: &RunId,
sequence: u64,
) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
async fn load_session_snapshot(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
async fn clear_session_snapshot(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<(), RuntimeStoreError>;
async fn replace_session_snapshot_if_current(
&self,
runtime_id: &LogicalRuntimeId,
expected_current: &[u8],
replacement: Vec<u8>,
) -> Result<bool, RuntimeStoreError>;
async fn clear_session_snapshot_if_current(
&self,
runtime_id: &LogicalRuntimeId,
expected_current: &[u8],
) -> Result<bool, RuntimeStoreError>;
async fn persist_input_state(
&self,
runtime_id: &LogicalRuntimeId,
state: &InputStatePersistenceRecord,
) -> Result<(), RuntimeStoreError>;
async fn load_input_state(
&self,
runtime_id: &LogicalRuntimeId,
input_id: &InputId,
) -> Result<Option<StoredInputState>, RuntimeStoreError>;
async fn load_machine_lifecycle_record(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
async fn commit_machine_lifecycle(
&self,
runtime_id: &LogicalRuntimeId,
commit: MachineLifecycleCommit,
input_states: &[InputStatePersistenceRecord],
) -> Result<(), RuntimeStoreError>;
async fn persist_ops_lifecycle(
&self,
runtime_id: &LogicalRuntimeId,
snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
) -> Result<(), RuntimeStoreError> {
let _ = (runtime_id, snapshot);
Err(RuntimeStoreError::Unsupported(
"persist_ops_lifecycle".into(),
))
}
async fn load_ops_lifecycle(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
let _ = runtime_id;
Err(RuntimeStoreError::Unsupported("load_ops_lifecycle".into()))
}
async fn delete_ops_lifecycle(
&self,
runtime_id: &LogicalRuntimeId,
) -> Result<(), RuntimeStoreError> {
let _ = runtime_id;
Err(RuntimeStoreError::Unsupported(
"delete_ops_lifecycle".into(),
))
}
}
pub use memory::InMemoryRuntimeStore;
#[cfg(feature = "sqlite-store")]
pub use sqlite::SqliteRuntimeStore;