use std::fs;
use std::path::Path;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
use crate::db;
use crate::memory::file::write_memory_file;
use crate::memory::governance::GovernanceDecisionView;
use crate::paths::state::StateLayout;
use crate::repo::marker as repo_marker;
use crate::state::{
compiled as compiled_state, projection_metadata,
protected_write::{self, AppendWriteAuthority, AppendWriteOutcome, ExclusiveWriteOptions},
session,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct MemoryFileMutation {
pub(crate) scope: String,
pub(crate) path: String,
pub(crate) next_contents: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct MemoryOpPlan {
pub(crate) target: MemoryFileMutation,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) source: Option<MemoryFileMutation>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) refresh_locality_id: Option<String>,
#[serde(default)]
pub(crate) authored_entry_ids: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) governance: Option<GovernanceDecisionView>,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct StagedMemoryOpView {
pub(crate) op_id: String,
pub(crate) outcome: AppendWriteOutcome,
pub(crate) native_state_path: String,
pub(crate) target_scope: String,
pub(crate) target_path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) source_path: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub(crate) authored_entry_ids: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) message: Option<String>,
}
pub(crate) fn queue_required_for_autonomous(layout: &StateLayout) -> Result<bool> {
let Some(state) = session::load_for_layout(layout)? else {
return Ok(false);
};
Ok(state.lifecycle() == session::SessionLifecycle::Autonomous)
}
pub(crate) fn fingerprint<T: Serialize>(value: &T) -> Result<String> {
let json = serde_json::to_vec(value)?;
Ok(hex_digest(&json))
}
pub(crate) fn stable_op_id(
prefix: &str,
request_fingerprint: &str,
explicit_id: Option<&str>,
) -> String {
explicit_id
.map(str::to_owned)
.unwrap_or_else(|| format!("memory_op_{prefix}_{}", &request_fingerprint[..16]))
}
pub(crate) fn load_replayed_snapshot(
layout: &StateLayout,
op_id: &str,
request_fingerprint: &str,
) -> Result<Option<Value>> {
if !layout.state_db_path().exists() {
return Ok(None);
}
let pods_root = layout.pods_root();
let db = db::StateDb::open(&layout.state_db_path(), Some(&pods_root))?;
let Some(record) = db::memory_ops::get_by_id(db.conn(), op_id)? else {
return Ok(None);
};
if record.request_fingerprint != request_fingerprint || !record.reconciled {
return Ok(None);
}
let Some(snapshot_json) = record.snapshot_json.as_deref() else {
return Ok(None);
};
serde_json::from_str(snapshot_json)
.with_context(|| format!("failed to decode queued memory-op snapshot for `{op_id}`"))
.map(Some)
}
pub(crate) fn stage_and_reconcile(
repo_root: &Path,
layout: &StateLayout,
command: &str,
op_id: &str,
request_fingerprint: &str,
write_options: &ExclusiveWriteOptions,
plan: &MemoryOpPlan,
) -> Result<StagedMemoryOpView> {
if let Some(conflict) = protected_write::authorize_append_surface_write(
layout,
"memory_op_queue",
write_options,
AppendWriteAuthority::OwnerOnly,
)? {
return Ok(StagedMemoryOpView {
op_id: op_id.to_owned(),
outcome: conflict.outcome,
native_state_path: layout.state_db_path().display().to_string(),
target_scope: plan.target.scope.clone(),
target_path: plan.target.path.clone(),
source_path: plan.source.as_ref().map(|source| source.path.clone()),
authored_entry_ids: plan.authored_entry_ids.clone(),
message: Some(conflict.message),
});
}
let pods_root = layout.pods_root();
let db = db::StateDb::open(&layout.state_db_path(), Some(&pods_root))?;
if let Some(existing) = db::memory_ops::get_by_id(db.conn(), op_id)? {
if existing.request_fingerprint != request_fingerprint {
return staged_view_from_record(
layout,
&existing,
AppendWriteOutcome::DuplicateIdConflict,
Some(format!(
"memory-op id `{op_id}` is already bound to a different queued memory write"
)),
);
}
if existing.reconciled {
return staged_view_from_record(
layout,
&existing,
AppendWriteOutcome::IdempotentNoop,
None,
);
}
let stored_plan: MemoryOpPlan = serde_json::from_str(&existing.plan_json)
.with_context(|| format!("failed to decode queued memory-op plan for `{op_id}`"))?;
let outcome = apply_plan(repo_root, layout, &stored_plan)?;
db::memory_ops::mark_reconciled(
db.conn(),
op_id,
session::now_epoch_s()?,
append_outcome_label(outcome),
&serde_json::to_string(&stored_plan.authored_entry_ids)?,
&stored_plan.target.path,
stored_plan
.source
.as_ref()
.map(|source| source.path.as_str()),
)?;
return Ok(StagedMemoryOpView {
op_id: op_id.to_owned(),
outcome,
native_state_path: layout.state_db_path().display().to_string(),
target_scope: stored_plan.target.scope,
target_path: stored_plan.target.path,
source_path: stored_plan.source.map(|source| source.path),
authored_entry_ids: stored_plan.authored_entry_ids,
message: None,
});
}
let now = session::now_epoch_s()?;
db::memory_ops::insert_staged(
db.conn(),
&db::memory_ops::MemoryOpRecord {
id: op_id.to_owned(),
command: command.to_owned(),
request_fingerprint: request_fingerprint.to_owned(),
plan_json: serde_json::to_string(plan)?,
staged_at_epoch_s: now,
updated_at_epoch_s: now,
actor_id: write_options.actor_id.clone(),
session_id: write_options.session_id.clone(),
reconciled: false,
outcome: None,
authored_entry_ids_json: "[]".to_owned(),
authored_target_path: None,
authored_source_path: None,
snapshot_json: None,
},
)?;
let outcome = apply_plan(repo_root, layout, plan)?;
db::memory_ops::mark_reconciled(
db.conn(),
op_id,
session::now_epoch_s()?,
append_outcome_label(outcome),
&serde_json::to_string(&plan.authored_entry_ids)?,
&plan.target.path,
plan.source.as_ref().map(|source| source.path.as_str()),
)?;
Ok(StagedMemoryOpView {
op_id: op_id.to_owned(),
outcome,
native_state_path: layout.state_db_path().display().to_string(),
target_scope: plan.target.scope.clone(),
target_path: plan.target.path.clone(),
source_path: plan.source.as_ref().map(|source| source.path.clone()),
authored_entry_ids: plan.authored_entry_ids.clone(),
message: None,
})
}
pub(crate) fn persist_report_snapshot<T: Serialize>(
layout: &StateLayout,
op_id: &str,
report: &T,
) -> Result<()> {
let pods_root = layout.pods_root();
let db = db::StateDb::open(&layout.state_db_path(), Some(&pods_root))?;
db::memory_ops::update_snapshot(
db.conn(),
op_id,
session::now_epoch_s()?,
&serde_json::to_string(report)?,
)?;
Ok(())
}
pub(crate) fn apply_direct_plan(
repo_root: &Path,
layout: &StateLayout,
plan: &MemoryOpPlan,
) -> Result<AppendWriteOutcome> {
apply_plan(repo_root, layout, plan)
}
fn apply_plan(
repo_root: &Path,
layout: &StateLayout,
plan: &MemoryOpPlan,
) -> Result<AppendWriteOutcome> {
let target_current = read_optional_file(Path::new(&plan.target.path))?;
let source_current = plan
.source
.as_ref()
.map(|source| read_optional_file(Path::new(&source.path)))
.transpose()?;
let target_matches = target_current == plan.target.next_contents;
let source_matches = match (&plan.source, &source_current) {
(Some(source), Some(current)) => current == &source.next_contents,
(None, None) => true,
_ => false,
};
if target_matches && source_matches {
return Ok(AppendWriteOutcome::IdempotentNoop);
}
write_memory_file(Path::new(&plan.target.path), &plan.target.next_contents)?;
if let Some(source) = &plan.source {
write_memory_file(Path::new(&source.path), &source.next_contents)?;
}
refresh_compiled_state_after_write(layout, repo_root, plan.refresh_locality_id.as_deref())?;
Ok(AppendWriteOutcome::Applied)
}
fn refresh_compiled_state_after_write(
layout: &StateLayout,
repo_root: &Path,
locality_id: Option<&str>,
) -> Result<()> {
if let Some(locality_id) = locality_id {
let compiled = compiled_state::refresh_after_write(repo_root, layout, locality_id)?;
if let Err(error) = projection_metadata::record_for_compiled_store(layout, &compiled) {
projection_metadata::warn_record_error(layout, &error);
}
} else if let Some(marker) = repo_marker::load(repo_root)? {
let compiled = compiled_state::refresh_after_write(repo_root, layout, &marker.locality_id)?;
if let Err(error) = projection_metadata::record_for_compiled_store(layout, &compiled) {
projection_metadata::warn_record_error(layout, &error);
}
}
Ok(())
}
fn read_optional_file(path: &Path) -> Result<String> {
match fs::read_to_string(path) {
Ok(contents) => Ok(contents),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(String::new()),
Err(error) => Err(error).with_context(|| format!("failed to read {}", path.display())),
}
}
fn staged_view_from_record(
layout: &StateLayout,
record: &db::memory_ops::MemoryOpRecord,
outcome: AppendWriteOutcome,
message: Option<String>,
) -> Result<StagedMemoryOpView> {
let plan: MemoryOpPlan = serde_json::from_str(&record.plan_json)
.with_context(|| format!("failed to decode queued memory-op plan for `{}`", record.id))?;
let authored_entry_ids = serde_json::from_str(&record.authored_entry_ids_json)
.with_context(|| format!("failed to decode authored_entry_ids for `{}`", record.id))?;
Ok(StagedMemoryOpView {
op_id: record.id.clone(),
outcome,
native_state_path: layout.state_db_path().display().to_string(),
target_scope: plan.target.scope,
target_path: record
.authored_target_path
.clone()
.unwrap_or(plan.target.path),
source_path: record
.authored_source_path
.clone()
.or_else(|| plan.source.map(|source| source.path)),
authored_entry_ids,
message,
})
}
fn append_outcome_label(outcome: AppendWriteOutcome) -> &'static str {
match outcome {
AppendWriteOutcome::Applied => "applied",
AppendWriteOutcome::IdempotentNoop => "idempotent_noop",
AppendWriteOutcome::OwnershipConflict => "ownership_conflict",
AppendWriteOutcome::StaleSession => "stale_session",
AppendWriteOutcome::UnsupportedMultiwriter => "unsupported_multiwriter",
AppendWriteOutcome::DuplicateIdConflict => "duplicate_id_conflict",
}
}
fn hex_digest(bytes: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(bytes);
format!("{:x}", hasher.finalize())
}