use std::fs;
use std::path::{Path, PathBuf};
use chrono::Utc;
use crate::codec::sha256_stream::{sha256_hex, HashingWriter};
use crate::codec::tar_zst::{pack_files, PackEntry};
use crate::error::SnapshotError;
use crate::git_capture::{enumerate_git_files, read_head_meta};
use crate::id::SnapshotId;
use crate::manifest::{
ArtifactKind, ArtifactMeta, GitMeta, Manifest, SchemaVersions, ToolVersions, BUNDLE_FORMAT,
MANIFEST_VERSION,
};
use crate::memdir::enumerate_memdir_files;
use crate::meta::SnapshotMeta;
use crate::redaction::{redact_staging_dir, DefaultRedactionPolicy};
use crate::request::SnapshotRequest;
use crate::sqlite_backup::backup_named;
use crate::tenant_path::{
bundle_sha256_sibling, snapshot_bundle_path, snapshots_dir, validate_agent_id, validate_tenant,
};
use super::snapshotter::LocalFsSnapshotter;
const SQLITE_DBS: &[(&str, ArtifactKind)] = &[
("long_term", ArtifactKind::SqliteLongTerm),
("vector", ArtifactKind::SqliteVector),
("concepts", ArtifactKind::SqliteConcepts),
("compactions", ArtifactKind::SqliteCompactions),
];
pub(super) async fn run_snapshot(
s: &LocalFsSnapshotter,
req: SnapshotRequest,
) -> Result<SnapshotMeta, SnapshotError> {
let agent_id = validate_agent_id(&req.agent_id)?.to_string();
let tenant = validate_tenant(&req.tenant)?.to_string();
let _lock = s.locks().acquire(&agent_id, s.lock_timeout()).await?;
let snapshots_dir_path = snapshots_dir(s.state_root(), &tenant, &agent_id)?;
fs::create_dir_all(&snapshots_dir_path)?;
let id = SnapshotId::new();
let encrypted = req.encrypt.is_some();
let bundle_path = snapshot_bundle_path(s.state_root(), &tenant, &agent_id, id, encrypted)?;
let staging_dir = snapshots_dir_path.join(format!(".staging-{}", id.as_filename()));
fs::create_dir_all(&staging_dir)?;
let result = build_bundle(s, &agent_id, &tenant, &req, id, &bundle_path, &staging_dir).await;
let _ = fs::remove_dir_all(&staging_dir);
result
}
async fn build_bundle(
s: &LocalFsSnapshotter,
agent_id: &str,
tenant: &str,
req: &SnapshotRequest,
id: SnapshotId,
bundle_path: &Path,
staging_dir: &Path,
) -> Result<SnapshotMeta, SnapshotError> {
let encrypted = req.encrypt.is_some();
let memdir = s.path_resolver().memdir(agent_id, tenant);
let sqlite_dir = s.path_resolver().sqlite_dir(agent_id, tenant);
let git_meta = read_head_meta_or_placeholder(&memdir);
fs::create_dir_all(staging_dir.join("sqlite"))?;
fs::create_dir_all(staging_dir.join("state"))?;
let mut staged: Vec<StagedArtifact> = Vec::new();
for (name, kind) in SQLITE_DBS {
let src = sqlite_dir.join(format!("{name}.sqlite"));
if !src.exists() {
continue;
}
let (dst, _size) = backup_named(&src, &staging_dir.join("sqlite"), name).await?;
staged.push(StagedArtifact {
on_disk: dst,
in_bundle: format!("sqlite/{name}.sqlite"),
kind: *kind,
});
}
let extract_cursor = s
.state_provider()
.capture_extract_cursor(&agent_id.to_string())
.await?;
if let Some(value) = extract_cursor {
let path = staging_dir.join("state/extract_cursor.json");
fs::write(&path, serde_json::to_vec_pretty(&value)?)?;
staged.push(StagedArtifact {
on_disk: path,
in_bundle: "state/extract_cursor.json".into(),
kind: ArtifactKind::StateExtractCursor,
});
}
let dream_run = s
.state_provider()
.capture_last_dream_run(&agent_id.to_string())
.await?;
if let Some(value) = dream_run {
let path = staging_dir.join("state/dream_run.json");
fs::write(&path, serde_json::to_vec_pretty(&value)?)?;
staged.push(StagedArtifact {
on_disk: path,
in_bundle: "state/dream_run.json".into(),
kind: ArtifactKind::StateDreamRun,
});
}
for (src, in_bundle) in enumerate_memdir_files(&memdir)? {
staged.push(StagedArtifact {
on_disk: src,
in_bundle,
kind: ArtifactKind::MemoryFile,
});
}
for (src, in_bundle) in enumerate_git_files(&memdir)? {
staged.push(StagedArtifact {
on_disk: src,
in_bundle,
kind: ArtifactKind::GitBundle,
});
}
let redaction_report = if req.redact_secrets {
let policy = DefaultRedactionPolicy::new();
redact_staging_dir(staging_dir, &policy)?
} else {
None
};
let mut artifacts = Vec::with_capacity(staged.len());
for art in &staged {
let bytes = fs::read(&art.on_disk)?;
artifacts.push(ArtifactMeta {
path_in_bundle: art.in_bundle.clone(),
kind: art.kind,
size_bytes: bytes.len() as u64,
sha256: sha256_hex(&bytes),
});
}
let mut concat = String::with_capacity(artifacts.len() * 64);
for a in &artifacts {
concat.push_str(&a.sha256);
}
let bundle_sha256 = sha256_hex(concat.as_bytes());
let encryption_meta = build_encryption_meta(&req.encrypt)?;
let manifest = Manifest {
manifest_version: MANIFEST_VERSION,
bundle_format: BUNDLE_FORMAT.into(),
snapshot_id: id,
agent_id: agent_id.to_string(),
tenant: tenant.to_string(),
label: req.label.clone(),
created_at_ms: Utc::now().timestamp_millis(),
created_by: req.created_by.clone(),
schema_versions: SchemaVersions::CURRENT,
git: git_meta,
artifacts,
redactions: redaction_report.clone(),
encryption: encryption_meta,
tool_versions: ToolVersions::current(),
bundle_sha256,
};
let manifest_path = staging_dir.join("manifest.json");
fs::write(&manifest_path, serde_json::to_vec_pretty(&manifest)?)?;
let mut entries: Vec<PackEntry> = Vec::with_capacity(staged.len() + 1);
entries.push(PackEntry {
path_in_bundle: "manifest.json",
source: &manifest_path,
});
for art in &staged {
entries.push(PackEntry {
path_in_bundle: &art.in_bundle,
source: &art.on_disk,
});
}
let partial_name = format!(
"{}.partial",
bundle_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("bundle")
);
let partial = bundle_path.with_file_name(partial_name);
{
let f = fs::File::create(&partial)?;
let hashing = HashingWriter::new(f);
let file_digest = pack_pipeline(&entries, hashing, &req.encrypt)?;
fs::write(bundle_sha256_sibling(bundle_path), &file_digest)?;
}
fs::rename(&partial, bundle_path)?;
let bundle_size_bytes = fs::metadata(bundle_path)?.len();
Ok(SnapshotMeta {
id,
agent_id: agent_id.to_string(),
tenant: tenant.to_string(),
label: req.label.clone(),
created_at_ms: manifest.created_at_ms,
bundle_path: bundle_path.to_path_buf(),
bundle_size_bytes,
bundle_sha256: manifest.bundle_sha256.clone(),
git_oid: Some(manifest.git.head_oid.clone()),
schema_versions: SchemaVersions::CURRENT,
encrypted,
redactions_applied: redaction_report.is_some(),
})
}
#[cfg(feature = "snapshot-encryption")]
fn resolve_recipients(strings: &[String]) -> Result<Vec<age::x25519::Recipient>, SnapshotError> {
let mut seen = std::collections::HashSet::new();
let mut out = Vec::with_capacity(strings.len());
for (i, s) in strings.iter().enumerate() {
if !seen.insert(s.clone()) {
tracing::debug!(
target: "memory.snapshot.encryption",
index = i,
"skipping duplicate recipient string"
);
continue;
}
out.push(
crate::codec::age_codec::parse_recipient(s)
.map_err(|e| SnapshotError::Encryption(format!("recipient at index {i}: {e}")))?,
);
}
if out.is_empty() {
return Err(SnapshotError::Encryption("empty recipients".into()));
}
Ok(out)
}
fn build_encryption_meta(
key: &Option<crate::request::EncryptionKey>,
) -> Result<Option<crate::manifest::EncryptionMeta>, SnapshotError> {
let Some(key) = key else { return Ok(None) };
#[cfg(feature = "snapshot-encryption")]
{
let recipients: Vec<age::x25519::Recipient> = match key {
crate::request::EncryptionKey::AgePublicKey(s) => {
vec![crate::codec::age_codec::parse_recipient(s)?]
}
crate::request::EncryptionKey::AgePublicKeys(strings) => resolve_recipients(strings)?,
};
Ok(Some(crate::manifest::EncryptionMeta {
scheme: "age".to_string(),
recipients_fingerprint: recipients
.iter()
.map(|r| crate::codec::age_codec::fingerprint(r))
.collect(),
}))
}
#[cfg(not(feature = "snapshot-encryption"))]
{
let _ = key;
Err(SnapshotError::Encryption(
"AgePublicKey* supplied but `snapshot-encryption` feature is disabled".into(),
))
}
}
fn pack_pipeline(
entries: &[PackEntry<'_>],
hashing: HashingWriter<fs::File>,
encrypt: &Option<crate::request::EncryptionKey>,
) -> Result<String, SnapshotError> {
if encrypt.is_none() {
let hashing = pack_files(entries, hashing)
.map_err(|e| SnapshotError::Io(std::io::Error::other(format!("pack: {e}"))))?;
let (_inner, file_digest, _bytes) = hashing.finalize_hex();
return Ok(file_digest);
}
#[cfg(feature = "snapshot-encryption")]
{
let recipients: Vec<age::x25519::Recipient> = match encrypt.as_ref().unwrap() {
crate::request::EncryptionKey::AgePublicKey(s) => {
vec![crate::codec::age_codec::parse_recipient(s)?]
}
crate::request::EncryptionKey::AgePublicKeys(strings) => resolve_recipients(strings)?,
};
let enc_writer = crate::codec::age_codec::encrypt_writer(hashing, recipients)?;
let enc_writer = pack_files(entries, enc_writer)
.map_err(|e| SnapshotError::Io(std::io::Error::other(format!("pack: {e}"))))?;
let hashing_back = enc_writer.finish()?;
let (_inner, file_digest, _bytes) = hashing_back.finalize_hex();
Ok(file_digest)
}
#[cfg(not(feature = "snapshot-encryption"))]
Err(SnapshotError::Encryption(
"encryption requested but `snapshot-encryption` feature is disabled".into(),
))
}
fn read_head_meta_or_placeholder(memdir: &Path) -> GitMeta {
match read_head_meta(memdir) {
Ok(m) => m,
Err(_) => GitMeta {
head_oid: "0".repeat(40),
head_subject: "(no memdir)".into(),
head_author: "nexo-memory-snapshot <ops@example.com>".into(),
head_ts_ms: 0,
},
}
}
struct StagedArtifact {
on_disk: PathBuf,
in_bundle: String,
kind: ArtifactKind,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::snapshotter::MemorySnapshotter;
use crate::tenant_path::snapshots_dir;
use git2::{IndexAddOption, Repository, Signature};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::{ConnectOptions, Connection};
use std::str::FromStr;
async fn seed_sqlite(path: &Path, rows: i64) {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).unwrap();
}
let opts = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))
.unwrap()
.create_if_missing(true);
let mut conn = opts.connect().await.unwrap();
sqlx::query("CREATE TABLE t (id INTEGER PRIMARY KEY, v TEXT)")
.execute(&mut conn)
.await
.unwrap();
for i in 0..rows {
sqlx::query("INSERT INTO t (id, v) VALUES (?, ?)")
.bind(i)
.bind(format!("row-{i}"))
.execute(&mut conn)
.await
.unwrap();
}
conn.close().await.unwrap();
}
fn seed_memdir(memdir: &Path) {
fs::create_dir_all(memdir).unwrap();
let repo = Repository::init(memdir).unwrap();
fs::write(memdir.join("MEMORY.md"), b"# index\n- topic-a\n").unwrap();
fs::write(memdir.join("topic-a.md"), b"# a\nseed\n").unwrap();
let mut index = repo.index().unwrap();
index
.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)
.unwrap();
index.write().unwrap();
let tree_id = index.write_tree().unwrap();
let tree = repo.find_tree(tree_id).unwrap();
let sig = Signature::now("operator", "ops@example.com").unwrap();
repo.commit(Some("HEAD"), &sig, &sig, "snapshot:seed", &tree, &[])
.unwrap();
}
fn build_snapshotter(state_root: &Path) -> LocalFsSnapshotter {
LocalFsSnapshotter::builder()
.state_root(state_root)
.memdir_root(state_root.join("agents-memdir"))
.sqlite_root(state_root.join("agents-sqlite"))
.build()
.unwrap()
}
#[tokio::test]
async fn happy_path_produces_bundle_and_sibling_hash() {
let tmp = tempfile::tempdir().unwrap();
let s = build_snapshotter(tmp.path());
let memdir = tmp.path().join("agents-memdir/ana");
seed_memdir(&memdir);
seed_sqlite(&tmp.path().join("agents-sqlite/ana/long_term.sqlite"), 10).await;
let req = SnapshotRequest::cli("ana", "default");
let meta = s.snapshot(req).await.unwrap();
assert!(meta.bundle_path.exists(), "{}", meta.bundle_path.display());
assert!(meta.bundle_path.to_string_lossy().ends_with(".tar.zst"));
assert!(meta.bundle_size_bytes > 0);
assert_eq!(meta.bundle_sha256.len(), 64);
assert!(meta.git_oid.is_some());
let sib = bundle_sha256_sibling(&meta.bundle_path);
let body = fs::read_to_string(&sib).unwrap();
assert_eq!(body.trim().len(), 64);
}
#[tokio::test]
async fn snapshot_path_lives_under_tenant_root() {
let tmp = tempfile::tempdir().unwrap();
let s = build_snapshotter(tmp.path());
seed_memdir(&tmp.path().join("agents-memdir/ana"));
let req = SnapshotRequest::cli("ana", "acme");
let meta = s.snapshot(req).await.unwrap();
let dir = snapshots_dir(tmp.path(), "acme", "ana").unwrap();
assert!(meta.bundle_path.starts_with(&dir));
}
#[tokio::test]
async fn second_snapshot_with_held_lock_returns_concurrent() {
use crate::local_fs::lock::AgentLockMap;
let map = AgentLockMap::new();
let agent: crate::id::AgentId = "ana".into();
let _g = map
.acquire(&agent, std::time::Duration::from_millis(50))
.await
.unwrap();
let err = map
.acquire(&agent, std::time::Duration::from_millis(50))
.await
.unwrap_err();
assert!(matches!(err, SnapshotError::Concurrent(ref a) if a == &agent));
}
#[tokio::test]
async fn rejects_invalid_tenant_id() {
let tmp = tempfile::tempdir().unwrap();
let s = build_snapshotter(tmp.path());
seed_memdir(&tmp.path().join("agents-memdir/ana"));
let mut req = SnapshotRequest::cli("ana", "default");
req.tenant = "BAD-Tenant".into(); let err = s.snapshot(req).await.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("tenant") || msg.contains("[a-z0-9_-]"),
"{msg}"
);
}
#[cfg(feature = "snapshot-encryption")]
#[tokio::test]
async fn pack_pipeline_handles_age_public_keys_variant() {
use crate::request::{DecryptionIdentity, EncryptionKey, RestoreRequest};
use age::secrecy::ExposeSecret;
let tmp = tempfile::tempdir().unwrap();
let s = build_snapshotter(tmp.path());
seed_memdir(&tmp.path().join("agents-memdir/ana"));
seed_sqlite(&tmp.path().join("agents-sqlite/ana/long_term.sqlite"), 3).await;
let id_a = age::x25519::Identity::generate();
let id_b = age::x25519::Identity::generate();
let recipients = vec![id_a.to_public().to_string(), id_b.to_public().to_string()];
let id_a_path = tmp.path().join("identity-a.txt");
let id_b_path = tmp.path().join("identity-b.txt");
std::fs::write(&id_a_path, id_a.to_string().expose_secret()).unwrap();
std::fs::write(&id_b_path, id_b.to_string().expose_secret()).unwrap();
let mut snap_req = SnapshotRequest::cli("ana", "default");
snap_req.encrypt = Some(EncryptionKey::AgePublicKeys(recipients));
let meta = s.snapshot(snap_req).await.unwrap();
assert!(meta.encrypted);
assert!(meta.bundle_path.to_string_lossy().ends_with(".tar.zst.age"));
let mut req_a = RestoreRequest::new("ana", "default", &meta.bundle_path);
req_a.auto_pre_snapshot = false;
req_a.decrypt = Some(DecryptionIdentity::AgeIdentityFile(id_a_path));
let report_a = s.restore(req_a).await.unwrap();
assert!(!report_a.dry_run);
let recipients2 = vec![
age::x25519::Identity::generate().to_public().to_string(),
id_b.to_public().to_string(),
];
let mut snap_req2 = SnapshotRequest::cli("ana", "default");
snap_req2.encrypt = Some(EncryptionKey::AgePublicKeys(recipients2));
let meta2 = s.snapshot(snap_req2).await.unwrap();
let mut req_b = RestoreRequest::new("ana", "default", &meta2.bundle_path);
req_b.auto_pre_snapshot = false;
req_b.decrypt = Some(DecryptionIdentity::AgeIdentityFile(id_b_path));
let report_b = s.restore(req_b).await.unwrap();
assert!(!report_b.dry_run);
}
#[cfg(feature = "snapshot-encryption")]
#[tokio::test]
async fn build_encryption_meta_lists_all_fingerprints() {
use crate::request::EncryptionKey;
use age::secrecy::ExposeSecret;
let tmp = tempfile::tempdir().unwrap();
let s = build_snapshotter(tmp.path());
seed_memdir(&tmp.path().join("agents-memdir/ana"));
seed_sqlite(&tmp.path().join("agents-sqlite/ana/long_term.sqlite"), 1).await;
let recipients: Vec<String> = (0..3)
.map(|_| {
let id = age::x25519::Identity::generate();
let _ = id.to_string().expose_secret(); id.to_public().to_string()
})
.collect();
let mut snap_req = SnapshotRequest::cli("ana", "default");
snap_req.encrypt = Some(EncryptionKey::AgePublicKeys(recipients.clone()));
let meta = s.snapshot(snap_req).await.unwrap();
assert!(meta.encrypted);
assert_eq!(recipients.len(), 3);
}
#[cfg(feature = "snapshot-encryption")]
#[tokio::test]
async fn pack_pipeline_dedupes_duplicate_recipients() {
use crate::request::EncryptionKey;
let tmp = tempfile::tempdir().unwrap();
let s = build_snapshotter(tmp.path());
seed_memdir(&tmp.path().join("agents-memdir/ana"));
seed_sqlite(&tmp.path().join("agents-sqlite/ana/long_term.sqlite"), 1).await;
let recipient = age::x25519::Identity::generate().to_public().to_string();
let recipients = vec![recipient.clone(), recipient.clone(), recipient];
let mut snap_req = SnapshotRequest::cli("ana", "default");
snap_req.encrypt = Some(EncryptionKey::AgePublicKeys(recipients));
let meta = s.snapshot(snap_req).await.unwrap();
assert!(meta.encrypted);
}
}