use anyhow::Result;
use clap::Parser;
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;
use triblespace::prelude::blobschemas::SimpleArchive;
use triblespace::prelude::BlobStore;
use triblespace::prelude::BlobStoreGet;
use triblespace::prelude::BlobStorePut;
use triblespace::prelude::BranchStore;
use triblespace::prelude::View;
use triblespace_core::blob::schemas::longstring::LongString;
use triblespace_core::blob::ToBlob;
use triblespace_core::id::id_hex;
use triblespace_core::id::Id;
use triblespace_core::repo::pile::Pile;
use triblespace_core::repo::Repository;
use triblespace_core::trible::TribleSet;
use triblespace_core::value::schemas::hash::{Blake3, Handle, Hash};
use triblespace_core::value::Value;
use super::signing::load_signing_key;
use triblespace_core::repo::BlobStoreMeta;
type BranchNameHandle = Value<Handle<Blake3, LongString>>;
#[allow(non_upper_case_globals)]
const MAGIC_MARKER_BLOB: Id = id_hex!("1E08B022FF2F47B6EBACF1D68EB35D96");
#[allow(non_upper_case_globals)]
const MAGIC_MARKER_BRANCH: Id = id_hex!("2BC991A7F5D5D2A3A468C53B0AA03504");
#[allow(non_upper_case_globals)]
const MAGIC_MARKER_BRANCH_TOMBSTONE: Id = id_hex!("E888CC787202D2AE4C654BFE9699C430");
const RECORD_LEN: u64 = 64;
#[derive(Parser)]
pub enum Command {
List {
path: PathBuf,
#[arg(long)]
all: bool,
#[arg(long)]
deleted: bool,
},
Create {
pile: PathBuf,
name: String,
#[arg(long)]
signing_key: Option<PathBuf>,
},
Inspect {
pile: PathBuf,
branch: String,
},
Delete {
pile: PathBuf,
branch: String,
},
Set {
pile: PathBuf,
branch: String,
meta: String,
#[arg(long)]
expected: Option<String>,
},
Reflog {
pile: PathBuf,
branch: String,
#[arg(long, default_value_t = 50)]
limit: usize,
},
Export {
#[arg(long)]
from_pile: PathBuf,
#[arg(long)]
branch: String,
#[arg(long)]
to_pile: PathBuf,
},
Stats {
pile: PathBuf,
branch: String,
#[arg(long, default_value_t = false)]
full: bool,
},
MergeImport {
#[arg(long)]
from_pile: PathBuf,
#[arg(long)]
from_id: String,
#[arg(long)]
to_pile: PathBuf,
#[arg(long)]
to_id: String,
#[arg(long)]
signing_key: Option<PathBuf>,
},
Consolidate {
pile: PathBuf,
#[arg(num_args = 0..)]
branches: Vec<String>,
#[arg(long)]
out_name: Option<String>,
#[arg(long)]
dry_run: bool,
#[arg(long)]
delete_sources: bool,
#[arg(long)]
by_name: bool,
#[arg(long, conflicts_with = "by_name")]
by_name_include_deleted: bool,
#[arg(long)]
signing_key: Option<PathBuf>,
},
Log {
pile: PathBuf,
branch: String,
#[arg(long, default_value_t = 50)]
limit: usize,
#[arg(long)]
oneline: bool,
},
Describe {
pile: PathBuf,
branch: String,
#[arg(long)]
entities: bool,
},
Show {
pile: PathBuf,
commit: String,
},
Rename {
pile: PathBuf,
branch: String,
new_name: String,
#[arg(long)]
signing_key: Option<PathBuf>,
},
}
pub fn run(cmd: Command) -> Result<()> {
match cmd {
Command::List { path, all, deleted } => {
use triblespace_core::repo::pile::Pile;
if all || deleted {
let mut pile: Pile<Blake3> = Pile::open(&path)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let records = scan_pile_records(&path)?;
let states = collapse_branch_states(&records);
let mut rows: Vec<(Id, &BranchState)> = states.iter().map(|(id, s)| (*id, s)).collect();
rows.sort_by_key(|(id, _)| *id);
for (id, state) in rows {
if deleted && state.kind != RecordKind::Tombstone {
continue;
}
let meta_handle = match state.kind {
RecordKind::Set => state.meta,
RecordKind::Tombstone => state.last_set,
};
let kind = match state.kind {
RecordKind::Set => "set",
RecordKind::Tombstone => "delete",
};
let mut name = "-".to_string();
let mut head_str = "-".to_string();
if let Some(mh) = meta_handle {
if reader.metadata(mh)?.is_some() {
if let Ok(meta_set) = reader.get::<TribleSet, _>(mh) {
if let Ok(Some(n)) = load_branch_name(&reader, &meta_set) {
name = n;
}
if let Some(h) = extract_repo_head(&meta_set) {
head_str = format!("blake3:{}", hex::encode(h.raw));
}
}
}
}
println!("{id:X}\t{kind}\t{head_str}\t{name}");
}
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
} else {
let mut pile: Pile<Blake3> = Pile::open(&path)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let iter = pile.branches()?;
let head_attr = triblespace_core::repo::head.id();
let mut rows: Vec<(String, Id, String)> = Vec::new();
for branch in iter {
let id = branch?;
let meta_handle = match pile.head(id)? {
Some(handle) => handle,
None => {
rows.push(("<deleted>".to_string(), id, "-".to_string()));
continue;
}
};
let (name, head) = match reader.get::<TribleSet, _>(meta_handle) {
Ok(meta) => {
let name_attr = triblespace_core::metadata::name.id();
let mut name_handle: Option<BranchNameHandle> = None;
let mut head_handle: Option<Value<Handle<Blake3, SimpleArchive>>> =
None;
for t in meta.iter() {
if t.a() == &name_attr {
let h: BranchNameHandle = *t.v();
if name_handle.replace(h).is_some() {
name_handle = None;
break;
}
} else if t.a() == &head_attr {
let h: Value<Handle<Blake3, SimpleArchive>> = *t.v();
if head_handle.replace(h).is_some() {
head_handle = None;
}
}
}
let name = match name_handle {
None => "<unnamed>".to_string(),
Some(handle) => match reader.get::<View<str>, _>(handle) {
Ok(view) => view.as_ref().to_string(),
Err(_) => format!(
"<name blob missing ({})>",
hex::encode_upper(&handle.raw[..4])
),
},
};
let head = match head_handle {
None => "-".to_string(),
Some(handle) => format!("blake3:{}", hex::encode(handle.raw)),
};
(name, head)
}
Err(_) => (
format!(
"<metadata blob missing ({})>",
hex::encode_upper(&meta_handle.raw[..4])
),
"-".to_string(),
),
};
rows.push((name, id, head));
}
rows.sort_by(|(a_name, a_id, _), (b_name, b_id, _)| {
a_name.cmp(b_name).then_with(|| a_id.cmp(b_id))
});
for (name, id, head) in rows {
println!("{id:X}\t{head}\t{name}");
}
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
}
Command::Create {
pile,
name,
signing_key,
} => {
use triblespace_core::repo::pile::Pile;
use triblespace_core::repo::Repository;
use triblespace_core::value::schemas::hash::Blake3;
let pile: Pile<Blake3> = Pile::open(&pile)?;
let key = load_signing_key(&signing_key)?;
let mut repo = Repository::new(pile, key, TribleSet::new())?;
let res = (|| -> Result<(), anyhow::Error> {
let branch_id = repo
.create_branch(&name, None)
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
println!("{:#X}", *branch_id);
Ok(())
})();
let close_res = repo
.into_storage()
.close()
.map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Inspect { pile, branch } => {
use triblespace::prelude::blobschemas::SimpleArchive;
use triblespace::prelude::valueschemas::Handle;
use triblespace_core::repo::pile::Pile;
use triblespace_core::trible::TribleSet;
use triblespace_core::value::schemas::hash::Blake3;
use triblespace_core::value::schemas::hash::Hash;
use triblespace_core::value::Value;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
let branch_id = parse_branch_id_hex(&branch)?;
let meta_handle = pile
.head(branch_id)?
.ok_or_else(|| anyhow::anyhow!("branch not found"))?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let meta_present = reader.metadata(meta_handle)?.is_some();
let (name_val, head_val, head_err): (
Option<String>,
Option<Value<Handle<Blake3, SimpleArchive>>>,
Option<String>,
) = if meta_present {
match reader.get::<TribleSet, SimpleArchive>(meta_handle) {
Ok(meta) => {
let mut head_val: Option<Value<Handle<Blake3, SimpleArchive>>> = None;
let repo_head_attr = triblespace_core::repo::head.id();
for t in meta.iter() {
if t.a() == &repo_head_attr {
let h = *t.v::<Handle<Blake3, SimpleArchive>>();
head_val = Some(h);
}
}
let name_val = load_branch_name(&reader, &meta)?;
(name_val, head_val, None)
}
Err(e) => (None, None, Some(format!("decode failed: {e:?}"))),
}
} else {
(None, None, None)
};
let id_hex = format!("{branch_id:X}");
let meta_hash: Value<Hash<Blake3>> = Handle::to_hash(meta_handle);
let meta_hex: String = meta_hash.from_value();
println!("Id: {id_hex}");
if let Some(nstr) = name_val.clone() {
println!("Name: {nstr}");
}
println!(
"Meta: {meta_hex} [{}]{}",
if meta_present { "present" } else { "missing" },
head_err
.as_deref()
.map(|e| format!(" ({e})"))
.unwrap_or_default()
);
if let Some(h) = head_val {
let head_hash: Value<Hash<Blake3>> = Handle::to_hash(h);
let head_hex: String = head_hash.from_value();
let present = reader.metadata(h)?.is_some();
println!(
"Head: {head_hex} [{}]",
if present { "present" } else { "missing" }
);
}
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Delete { pile, branch } => {
use triblespace_core::repo::pile::Pile;
use triblespace_core::value::schemas::hash::Blake3;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
let branch_id = parse_branch_id_hex(&branch)?;
let old = pile
.head(branch_id)?
.ok_or_else(|| anyhow::anyhow!("branch not found"))?;
match pile.update(branch_id, Some(old), None)? {
triblespace_core::repo::PushResult::Success() => {
println!("deleted branch {branch_id:X}");
Ok(())
}
triblespace_core::repo::PushResult::Conflict(_) => {
anyhow::bail!("branch {branch_id:X} advanced concurrently; rerun delete")
}
}
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Set {
pile,
branch,
meta,
expected,
} => {
use triblespace::prelude::blobschemas::SimpleArchive;
use triblespace::prelude::valueschemas::Handle;
use triblespace_core::repo::pile::Pile;
use triblespace_core::value::schemas::hash::Blake3;
use triblespace_core::value::Value;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
let branch_id = parse_branch_id_hex(&branch)?;
let new_meta: Value<Handle<Blake3, SimpleArchive>> = parse_blake3_handle(&meta)?;
let expected_old: Option<Value<Handle<Blake3, SimpleArchive>>> = match expected {
Some(s) => parse_blake3_handle_opt(&s)?,
None => pile.head(branch_id)?,
};
match pile.update(branch_id, expected_old, Some(new_meta))? {
triblespace_core::repo::PushResult::Success() => {
println!(
"set branch {bid:X} meta blake3:{meta}",
bid = branch_id,
meta = hex::encode(new_meta.raw)
);
Ok(())
}
triblespace_core::repo::PushResult::Conflict(existing) => {
let got = existing
.map(|h| format!("blake3:{}", hex::encode(h.raw)))
.unwrap_or_else(|| "-".to_string());
anyhow::bail!("branch head changed concurrently; current={got}")
}
}
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Reflog {
pile,
branch,
limit,
} => {
use triblespace_core::repo::pile::Pile;
let branch_id = parse_branch_id_hex(&branch)?;
let mut pile_reader: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
pile_reader.refresh()?;
let reader = pile_reader
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let all_records = scan_pile_records(&pile)?;
let branch_records: Vec<&RawBranchRecord> = all_records
.iter()
.filter(|r| r.branch_id == branch_id)
.collect();
let start = branch_records.len().saturating_sub(limit);
let tail = &branch_records[start..];
for (idx, rec) in tail.iter().rev().enumerate() {
let offset = rec.offset;
let kind = match rec.kind {
RecordKind::Set => "set",
RecordKind::Tombstone => "delete",
};
let meta = match rec.meta_handle {
None => "-".to_string(),
Some(h) => format!("blake3:{}", hex::encode(h.raw)),
};
let mut head_str = "-".to_string();
let mut head_state = "-";
let mut name: Option<String> = None;
let meta_state;
if let Some(mh) = rec.meta_handle {
let present = reader.metadata(mh)?.is_some();
meta_state = if present { "present" } else { "missing" };
if present {
if let Ok(meta_set) = reader.get::<TribleSet, _>(mh) {
name = load_branch_name(&reader, &meta_set).ok().flatten();
if let Some(h) = extract_repo_head(&meta_set) {
head_str = format!("blake3:{}", hex::encode(h.raw));
head_state = if reader.metadata(h)?.is_some() {
"present"
} else {
"missing"
};
}
}
}
} else {
meta_state = "-";
}
let name = name.as_deref().unwrap_or("-");
println!(
"{idx}\toffset={offset}\t{kind}\tmeta={meta}\tmeta[{meta_state}]\thead={head_str}\thead[{head_state}]\tname={name}"
);
}
Ok(())
})();
let close_res = pile_reader
.close()
.map_err(|e| anyhow::anyhow!("close pile: {e:?}"));
res.and(close_res)?;
}
Command::Export {
from_pile,
branch,
to_pile,
} => {
use triblespace_core::repo;
use triblespace_core::repo::pile::Pile;
use triblespace_core::value::schemas::hash::Blake3;
use triblespace_core::value::schemas::hash::Handle;
use triblespace_core::value::Value;
let bid = parse_branch_id_hex(&branch)?;
let mut src: Pile<Blake3> = Pile::open(&from_pile)?;
let mut dst: Pile<Blake3> = match Pile::open(&to_pile) {
Ok(pile) => pile,
Err(err) => {
let _ = src.close();
return Err(err.into());
}
};
let res = (|| -> Result<(), anyhow::Error> {
let src_meta = src
.head(bid)?
.ok_or_else(|| anyhow::anyhow!("source branch head not found"))?;
use std::collections::HashMap;
use triblespace_core::value::VALUE_LEN;
let mut mapping: HashMap<[u8; VALUE_LEN], Value<Handle<Blake3, _>>> =
HashMap::new();
let src_reader = src
.reader()
.map_err(|e| anyhow::anyhow!("src pile reader error: {e:?}"))?;
let handles = repo::reachable(&src_reader, std::iter::once(src_meta.transmute()));
let mut visited: usize = 0;
let mut stored: usize = 0;
for r in repo::transfer(&src_reader, &mut dst, handles) {
match r {
Ok((src_h, dst_h)) => {
visited += 1;
stored += 1;
mapping.insert(src_h.raw, dst_h);
}
Err(e) => return Err(anyhow::anyhow!("transfer failed: {e}")),
}
}
let dst_meta = mapping
.get(&src_meta.raw)
.ok_or_else(|| {
anyhow::anyhow!("destination meta handle not found after transfer")
})?
.clone();
let old = dst.head(bid)?;
let res = dst
.update(bid, old, Some(dst_meta.transmute()))
.map_err(|e| anyhow::anyhow!("destination branch update failed: {e:?}"))?;
match res {
triblespace_core::repo::PushResult::Success() => {
println!(
"export: copied visited={} stored={} and set branch {:#X}",
visited, stored, bid
);
}
triblespace_core::repo::PushResult::Conflict(existing) => {
println!("export: copied visited={} stored={} but branch update conflicted: existing={:?}", visited, stored, existing);
}
}
Ok(())
})();
let close_src = src.close().map_err(|e| anyhow::anyhow!("{e:?}"));
let close_dst = dst.close().map_err(|e| anyhow::anyhow!("{e:?}"));
match res {
Ok(()) => {
close_src?;
close_dst?;
Ok(())
}
Err(err) => {
if let Err(close_err) = close_src {
eprintln!("warning: failed to close source pile cleanly: {close_err:#}");
}
if let Err(close_err) = close_dst {
eprintln!(
"warning: failed to close destination pile cleanly: {close_err:#}"
);
}
Err(err)
}
}?;
}
Command::Stats { pile, branch, full } => {
use std::collections::{BTreeSet, HashSet};
use triblespace::prelude::blobschemas::SimpleArchive;
use triblespace::prelude::valueschemas::Handle;
use triblespace_core::repo::pile::Pile;
use triblespace_core::trible::TribleSet;
use triblespace_core::value::schemas::hash::Blake3;
use triblespace_core::value::schemas::hash::Hash;
use triblespace_core::value::Value;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let branch_id = parse_branch_id_hex(&branch)?;
let repo_parent_attr = triblespace_core::repo::parent.id();
let repo_content_attr = triblespace_core::repo::content.id();
let meta_handle = pile
.head(branch_id)?
.ok_or_else(|| anyhow::anyhow!("branch not found"))?;
let mut head_opt: Option<Value<Handle<Blake3, SimpleArchive>>> = None;
if reader.metadata(meta_handle)?.is_some() {
if let Ok(meta) = reader.get::<TribleSet, SimpleArchive>(meta_handle) {
let repo_head_attr = triblespace_core::repo::head.id();
for t in meta.iter() {
if t.a() == &repo_head_attr {
head_opt = Some(*t.v::<Handle<Blake3, SimpleArchive>>());
break;
}
}
}
}
let head = head_opt.ok_or_else(|| anyhow::anyhow!("branch has no head set"))?;
let mut visited: BTreeSet<String> = BTreeSet::new();
let mut stack: Vec<Value<Handle<Blake3, SimpleArchive>>> = vec![head];
let mut commit_count: usize = 0;
let mut total_triples_accum: usize = 0;
let mut content_blob_count: usize = 0;
let mut content_bytes_total: u64 = 0;
let mut content_misaligned_count: usize = 0;
let mut unioned = TribleSet::new();
while let Some(h) = stack.pop() {
let hh: Value<Hash<Blake3>> = Handle::to_hash(h);
let hex: String = hh.from_value();
if !visited.insert(hex.clone()) {
continue;
}
commit_count += 1;
if reader.metadata(h)?.is_none() {
continue;
}
let meta: TribleSet = match reader.get::<TribleSet, SimpleArchive>(h) {
Ok(m) => m,
Err(_) => continue,
};
let mut parents: Vec<Value<Handle<Blake3, SimpleArchive>>> = Vec::new();
let mut content_handles: Vec<Value<Handle<Blake3, SimpleArchive>>> = Vec::new();
for t in meta.iter() {
if t.a() == &repo_content_attr {
let c = *t.v::<Handle<Blake3, SimpleArchive>>();
content_handles.push(c);
} else if t.a() == &repo_parent_attr {
parents.push(*t.v::<Handle<Blake3, SimpleArchive>>());
}
}
for c in content_handles {
let Some(content_meta) = reader.metadata(c)? else {
continue;
};
content_blob_count = content_blob_count.saturating_add(1);
content_bytes_total =
content_bytes_total.saturating_add(content_meta.length);
let triples_from_length =
(content_meta.length / 64).try_into().unwrap_or(usize::MAX);
total_triples_accum =
total_triples_accum.saturating_add(triples_from_length);
if content_meta.length % 64 != 0 {
content_misaligned_count = content_misaligned_count.saturating_add(1);
}
if full {
let content: TribleSet = match reader.get::<TribleSet, SimpleArchive>(c)
{
Ok(s) => s,
Err(_) => continue,
};
unioned += content;
}
}
for p in parents {
stack.push(p);
}
}
println!("Branch: {branch_id:X}");
println!("Commits: {commit_count}");
println!("Content blobs (accum): {content_blob_count}");
println!("Content bytes (accum): {content_bytes_total}");
println!("Triples (accum): {total_triples_accum}");
if content_misaligned_count > 0 {
println!("Warning: {content_misaligned_count} content blob(s) had non-64-byte-aligned length.");
}
if full {
let unique_triples = unioned.len();
let mut entities: HashSet<Id> = HashSet::new();
let mut attributes: HashSet<Id> = HashSet::new();
for t in unioned.iter() {
entities.insert(*t.e());
attributes.insert(*t.a());
}
println!("Triples (unique): {unique_triples}");
println!("Entities: {}", entities.len());
println!("Attributes: {}", attributes.len());
}
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::MergeImport {
from_pile,
from_id,
to_pile,
to_id,
signing_key,
} => {
use triblespace::prelude::blobschemas::SimpleArchive;
use triblespace_core::repo;
use triblespace_core::repo::pile::Pile;
use triblespace_core::repo::Repository;
use triblespace_core::value::schemas::hash::Blake3;
use triblespace_core::value::schemas::hash::Handle;
use triblespace_core::value::Value;
struct CopyStats {
visited: usize,
stored: usize,
}
let src_bid = parse_branch_id_hex(&from_id)?;
let dst_bid = parse_branch_id_hex(&to_id)?;
let key = load_signing_key(&signing_key)?;
let mut src: Pile<Blake3> = Pile::open(&from_pile)?;
let dst_pile: Pile<Blake3> = match Pile::open(&to_pile) {
Ok(pile) => pile,
Err(err) => {
let _ = src.close();
return Err(err.into());
}
};
let mut repo = Repository::new(dst_pile, key, TribleSet::new())?;
let result = (|| -> Result<CopyStats, anyhow::Error> {
let src_head: Value<Handle<Blake3, SimpleArchive>> = src
.head(src_bid)?
.ok_or_else(|| anyhow::anyhow!("source branch head not found"))?;
let src_reader = src
.reader()
.map_err(|e| anyhow::anyhow!("src pile reader error: {e:?}"))?;
let handles = repo::reachable(&src_reader, std::iter::once(src_head.transmute()));
let mut visited: usize = 0;
let mut stored: usize = 0;
for r in repo::transfer(&src_reader, repo.storage_mut(), handles) {
match r {
Ok((_src_h, _dst_h)) => {
visited += 1;
stored += 1;
}
Err(e) => return Err(anyhow::anyhow!("transfer failed: {e}")),
}
}
let mut ws = repo
.pull(dst_bid)
.map_err(|e| anyhow::anyhow!("failed to open destination branch: {e:?}"))?;
ws.merge_commit(src_head)
.map_err(|e| anyhow::anyhow!("merge failed: {e:?}"))?;
while let Some(mut incoming) = repo
.try_push(&mut ws)
.map_err(|e| anyhow::anyhow!("push failed: {e:?}"))?
{
incoming
.merge(&mut ws)
.map_err(|e| anyhow::anyhow!("merge conflict: {e:?}"))?;
ws = incoming;
}
Ok(CopyStats { visited, stored })
})();
let close_src = src.close().map_err(|e| anyhow::anyhow!("{e:?}"));
let close_dst = repo
.into_storage()
.close()
.map_err(|e| anyhow::anyhow!("{e:?}"));
match result {
Ok(stats) => {
close_src?;
close_dst?;
println!(
"merge-import: copied visited={} stored={} and attached source head to destination branch",
stats.visited, stats.stored
);
Ok(())
}
Err(err) => {
if let Err(close_err) = close_src {
eprintln!("warning: failed to close source pile cleanly: {close_err:#}");
}
if let Err(close_err) = close_dst {
eprintln!(
"warning: failed to close destination pile cleanly: {close_err:#}"
);
}
Err(err)
}
}?;
}
Command::Consolidate {
pile,
branches,
out_name,
dry_run,
delete_sources,
by_name_include_deleted,
by_name,
signing_key,
} => {
use std::collections::{BTreeMap, HashSet};
let key = load_signing_key(&signing_key)?;
if by_name_include_deleted {
if out_name.is_some() {
eprintln!("warning: --out-name is ignored when --by-name-include-deleted is set");
}
let pile_path = pile;
let pile_store: Pile<Blake3> = Pile::open(&pile_path)?;
let mut repo = Repository::new(pile_store, key.clone(), TribleSet::new())?;
let res = (|| -> Result<(), anyhow::Error> {
repo.storage_mut().refresh()?;
let reader = repo
.storage_mut()
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let records = scan_pile_records(&pile_path)?;
let states = collapse_branch_states(&records);
let n_active = states.values().filter(|s| s.kind == RecordKind::Set).count();
let n_deleted = states.values().filter(|s| s.kind == RecordKind::Tombstone).count();
println!(
"scanning pile: found {} unique branch IDs ({} active, {} tombstoned)",
states.len(), n_active, n_deleted
);
let mut groups: BTreeMap<String, Vec<(Id, Option<Value<Handle<Blake3, SimpleArchive>>>)>> = BTreeMap::new();
for (bid, state) in &states {
let meta_handle = match state.kind {
RecordKind::Set => state.meta,
RecordKind::Tombstone => state.last_set,
};
let Some(mh) = meta_handle else {
groups.entry("<unnamed>".to_string()).or_default().push((*bid, None));
continue;
};
if reader.metadata(mh)?.is_none() {
eprintln!("warning: metadata blob missing for branch {bid:X}");
groups.entry("<unnamed>".to_string()).or_default().push((*bid, None));
continue;
}
let meta_set = match reader.get::<TribleSet, SimpleArchive>(mh) {
Ok(ms) => ms,
Err(_) => {
eprintln!("warning: failed to read metadata for branch {bid:X}");
groups.entry("<unnamed>".to_string()).or_default().push((*bid, None));
continue;
}
};
let name = load_branch_name(&reader, &meta_set)
.ok()
.flatten()
.unwrap_or_else(|| "<unnamed>".to_string());
let head = extract_repo_head(&meta_set);
groups.entry(name).or_default().push((*bid, head));
}
let statuses: HashMap<Id, &str> = states.iter().map(|(id, s)| {
let label = match s.kind {
RecordKind::Set => "active",
RecordKind::Tombstone => "deleted",
};
(*id, label)
}).collect();
let created_count = consolidate_groups(
&groups, &statuses, &reader, &mut repo, &key,
dry_run, delete_sources,
)?;
if dry_run {
println!("\ndry-run: no changes were made");
} else {
println!("\ncreated {created_count} consolidated branch(es)");
}
Ok(())
})();
let close_res = repo
.into_storage()
.close()
.map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
} else if by_name {
if out_name.is_some() {
eprintln!("warning: --out-name is ignored when --by-name is set");
}
let pile_store: Pile<Blake3> = Pile::open(&pile)?;
let mut repo = Repository::new(pile_store, key.clone(), TribleSet::new())?;
let res = (|| -> Result<(), anyhow::Error> {
repo.storage_mut().refresh()?;
let reader = repo
.storage_mut()
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let mut groups: std::collections::BTreeMap<String, Vec<(Id, Option<Value<Handle<Blake3, SimpleArchive>>>)>> = std::collections::BTreeMap::new();
let branch_ids: Vec<Id> = repo.storage_mut().branches()?
.collect::<Result<Vec<_>, _>>()?;
println!("found {} active branch(es)", branch_ids.len());
for bid in &branch_ids {
let Some(mh) = repo.storage_mut().head(*bid)? else {
continue;
};
if reader.metadata(mh)?.is_none() {
eprintln!("warning: metadata blob missing for branch {bid:X}");
groups.entry("<unnamed>".to_string()).or_default().push((*bid, None));
continue;
}
let meta_set = match reader.get::<TribleSet, SimpleArchive>(mh) {
Ok(ms) => ms,
Err(_) => {
eprintln!("warning: failed to read metadata for branch {bid:X}");
groups.entry("<unnamed>".to_string()).or_default().push((*bid, None));
continue;
}
};
let name = load_branch_name(&reader, &meta_set)
.ok()
.flatten()
.unwrap_or_else(|| "<unnamed>".to_string());
let head = extract_repo_head(&meta_set);
groups.entry(name).or_default().push((*bid, head));
}
let statuses: HashMap<Id, &str> = branch_ids.iter()
.map(|bid| (*bid, "active"))
.collect();
let created_count = consolidate_groups(
&groups, &statuses, &reader, &mut repo, &key,
dry_run, delete_sources,
)?;
if dry_run {
println!("\ndry-run: no changes were made");
} else {
println!("\ncreated {created_count} consolidated branch(es)");
}
Ok(())
})();
let close_res = repo
.into_storage()
.close()
.map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
} else {
let mut seen: HashSet<Id> = HashSet::new();
let mut branch_ids: Vec<Id> = Vec::new();
for raw in branches {
let bid = parse_branch_id_hex(&raw)?;
if seen.insert(bid) {
branch_ids.push(bid);
}
}
let pile: Pile<Blake3> = Pile::open(&pile)?;
let mut repo = Repository::new(pile, key.clone(), TribleSet::new())?;
let res = (|| -> Result<(), anyhow::Error> {
repo.storage_mut().refresh()?;
let reader = repo
.storage_mut()
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let repo_head_attr = triblespace_core::repo::head.id();
let mut candidates: Vec<(Id, Option<Value<Handle<Blake3, SimpleArchive>>>)> =
Vec::new();
for bid in branch_ids {
let meta_handle = repo
.storage_mut()
.head(bid)?
.ok_or_else(|| anyhow::anyhow!("branch not found: {bid:X}"))?;
let mut head_val: Option<Value<Handle<Blake3, SimpleArchive>>> = None;
if reader.metadata(meta_handle)?.is_some() {
if let Ok(meta) = reader.get::<TribleSet, SimpleArchive>(meta_handle) {
for t in meta.iter() {
if t.a() == &repo_head_attr {
head_val = Some(*t.v::<Handle<Blake3, SimpleArchive>>());
break;
}
}
}
}
candidates.push((bid, head_val));
}
println!("found {} branch(es)", candidates.len());
for (bid, head) in &candidates {
let id_hex = format!("{bid:X}");
if let Some(h) = head {
let hh: Value<Hash<Blake3>> = Handle::to_hash(*h);
let hex: String = hh.from_value();
println!("- {id_hex} -> commit {hex}");
} else {
println!("- {id_hex} -> <no head>");
}
}
if dry_run {
println!("dry-run: no changes will be made");
return Ok(());
}
if candidates.len() == 1 {
println!("only one branch present; nothing to consolidate");
return Ok(());
}
let parents: Vec<Value<Handle<Blake3, SimpleArchive>>> =
candidates.iter().filter_map(|(_, h)| *h).collect();
if parents.is_empty() {
anyhow::bail!("no branch heads available to attach");
}
let commit_set = triblespace_core::repo::commit::commit_metadata(
&key,
parents.clone(),
None,
None,
None,
);
let commit_handle = repo
.storage_mut()
.put(commit_set.to_blob())
.map_err(|e| anyhow::anyhow!("failed to put commit blob: {e:?}"))?;
let out = out_name.unwrap_or_else(|| "consolidated".to_string());
let new_id = *repo
.create_branch_with_key(&out, Some(commit_handle), key.clone())
.map_err(|e| anyhow::anyhow!("failed to create consolidated branch: {e:?}"))?;
println!("created consolidated branch '{out}' with id {new_id:X}");
if delete_sources {
for (bid, _) in &candidates {
if let Some(old) = repo.storage_mut().head(*bid)? {
match repo.storage_mut().update(*bid, Some(old), None)? {
triblespace_core::repo::PushResult::Success() => {
println!("deleted source branch {bid:X}");
}
triblespace_core::repo::PushResult::Conflict(_) => {
eprintln!("warning: branch {bid:X} advanced concurrently; skipping delete");
}
}
}
}
}
Ok(())
})();
let close_res = repo
.into_storage()
.close()
.map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
}
Command::Log {
pile,
branch,
limit,
oneline,
} => {
use std::collections::HashSet;
use triblespace_core::repo::pile::Pile;
let branch_id = parse_branch_id_hex(&branch)?;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let branch_meta = pile
.head(branch_id)?
.ok_or_else(|| anyhow::anyhow!("branch not found"))?;
let branch_meta_set: TribleSet = reader
.get(branch_meta)
.map_err(|e| anyhow::anyhow!("read branch metadata: {e:?}"))?;
let commit_head = extract_repo_head(&branch_meta_set)
.ok_or_else(|| anyhow::anyhow!("branch has no commit head"))?;
let mut queue: std::collections::VecDeque<Value<Handle<Blake3, SimpleArchive>>> =
std::collections::VecDeque::new();
let mut visited: HashSet<[u8; 32]> = HashSet::new();
queue.push_back(commit_head);
let mut printed = 0usize;
while let Some(current) = queue.pop_front() {
if !visited.insert(current.raw) {
continue;
}
if printed >= limit {
break;
}
let commit_set: TribleSet = match reader.get(current) {
Ok(c) => c,
Err(_) => {
let hash: Value<Hash<Blake3>> = Handle::to_hash(current);
let hex: String = hash.from_value();
println!("{hex} <missing blob>");
printed += 1;
continue;
}
};
let info = read_commit_fields(&commit_set);
let hash: Value<Hash<Blake3>> = Handle::to_hash(current);
let hex: String = hash.from_value();
let msg = if let Some(sm) = &info.short_message {
sm.clone()
} else if let Some(mh) = info.message {
match reader.get::<View<str>, _>(mh) {
Ok(v) => {
let s = v.as_ref();
if s.len() > 72 {
format!("{}...", &s[..72])
} else {
s.to_string()
}
}
Err(_) => "<message blob missing>".to_string(),
}
} else {
"<no message>".to_string()
};
let content_count = if let Some(ch) = info.content {
match reader.get::<TribleSet, _>(ch) {
Ok(ts) => format!("{}", ts.len()),
Err(_) => "?".to_string(),
}
} else {
"0".to_string()
};
let ts_str = if let Some(ts_val) = info.timestamp {
use triblespace_core::value::schemas::time::Lower;
let lower: Lower = ts_val.try_from_value().unwrap_or(Lower(0));
let epoch = hifitime::Epoch::from_tai_duration(
hifitime::Duration::from_total_nanoseconds(lower.0));
hifitime::efmt::Formatter::new(
epoch,
hifitime::efmt::consts::ISO8601,
).to_string()
} else {
"?".to_string()
};
if oneline {
println!(
"\x1b[33m{short}\x1b[0m {ts_str} {msg}",
short = &hex[..16],
);
} else {
println!("\x1b[33mcommit {hex}\x1b[0m");
if let Some(pk) = &info.signed_by {
println!("Signed: {}", hex::encode(&pk[..8]));
}
println!("Date: {ts_str}");
if !info.parents.is_empty() {
let parent_strs: Vec<String> = info.parents.iter().map(|p| {
let ph: Value<Hash<Blake3>> = Handle::to_hash(*p);
let phex: String = ph.from_value();
phex[..16].to_string()
}).collect();
let label = if info.parents.len() > 1 { "Merge: " } else { "Parent:" };
println!("{label} {}", parent_strs.join(" "));
}
println!();
println!(" {msg}");
println!();
println!(" {content_count} tribles");
println!();
}
printed += 1;
for p in &info.parents {
queue.push_back(*p);
}
}
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Show { pile, commit } => {
use triblespace_core::repo::pile::Pile;
let commit_handle: Value<Handle<Blake3, SimpleArchive>> =
parse_blake3_handle(&commit)?;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let commit_set: TribleSet = reader
.get(commit_handle)
.map_err(|e| anyhow::anyhow!("read commit blob: {e:?}"))?;
let info = read_commit_fields(&commit_set);
let hash: Value<Hash<Blake3>> = Handle::to_hash(commit_handle);
let hex: String = hash.from_value();
println!("Commit: {hex}");
if let Some(sm) = &info.short_message {
println!("Short message: {sm}");
}
if let Some(mh) = info.message {
match reader.get::<View<str>, _>(mh) {
Ok(v) => println!("Message: {}", v.as_ref()),
Err(_) => println!("Message: <blob missing>"),
}
}
if let Some(pk) = &info.signed_by {
println!("Signed by: {}", hex::encode(pk));
}
if info.parents.is_empty() {
println!("Parents: (none)");
} else {
println!("Parents:");
for p in &info.parents {
let ph: Value<Hash<Blake3>> = Handle::to_hash(*p);
let phex: String = ph.from_value();
let present = reader.metadata(*p)?.is_some();
println!(
" {phex} [{}]",
if present { "present" } else { "missing" }
);
}
}
if let Some(ch) = info.content {
let ch_hash: Value<Hash<Blake3>> = Handle::to_hash(ch);
let ch_hex: String = ch_hash.from_value();
let present = reader.metadata(ch)?.is_some();
print!("Content: {ch_hex} [{}]", if present { "present" } else { "missing" });
if present {
if let Ok(ts) = reader.get::<TribleSet, _>(ch) {
use std::collections::HashSet;
let mut entities: HashSet<Id> = HashSet::new();
let mut attributes: HashSet<Id> = HashSet::new();
for t in ts.iter() {
entities.insert(*t.e());
attributes.insert(*t.a());
}
print!(
" ({} tribles, {} entities, {} attributes)",
ts.len(),
entities.len(),
attributes.len()
);
}
}
println!();
} else {
println!("Content: (none)");
}
if let Some(mh) = info.metadata {
let mh_hash: Value<Hash<Blake3>> = Handle::to_hash(mh);
let mh_hex: String = mh_hash.from_value();
let present = reader.metadata(mh)?.is_some();
println!(
"Metadata: {mh_hex} [{}]",
if present { "present" } else { "missing" }
);
} else {
println!("Metadata: (none)");
}
println!("Commit tribles: {}", commit_set.len());
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Describe {
pile,
branch,
entities,
} => {
use std::collections::HashSet;
use triblespace_core::repo::pile::Pile;
let branch_id = parse_branch_id_hex(&branch)?;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let reader = pile
.reader()
.map_err(|e| anyhow::anyhow!("pile reader error: {e:?}"))?;
let branch_meta = pile
.head(branch_id)?
.ok_or_else(|| anyhow::anyhow!("branch not found"))?;
let branch_meta_set: TribleSet = reader
.get(branch_meta)
.map_err(|e| anyhow::anyhow!("read branch metadata: {e:?}"))?;
let commit_head = extract_repo_head(&branch_meta_set)
.ok_or_else(|| anyhow::anyhow!("branch has no commit head"))?;
struct AttrTally {
trible_count: usize,
entity_ids: HashSet<Id>,
}
let mut tallies: HashMap<Id, AttrTally> = HashMap::new();
let mut attr_names: HashMap<Id, String> = HashMap::new();
let mut visited: HashSet<[u8; 32]> = HashSet::new();
let mut stack: Vec<Value<Handle<Blake3, SimpleArchive>>> = vec![commit_head];
let mut commit_count = 0usize;
let tag_attr = triblespace_core::metadata::tag.id();
let attr_attr = triblespace_core::metadata::attribute.id();
let name_attr = triblespace_core::metadata::name.id();
while let Some(current) = stack.pop() {
if !visited.insert(current.raw) {
continue;
}
let commit_set: TribleSet = match reader.get(current) {
Ok(c) => c,
Err(_) => continue,
};
commit_count += 1;
let info = read_commit_fields(&commit_set);
if let Some(ch) = info.content {
if let Ok(content) = reader.get::<TribleSet, _>(ch) {
for t in content.iter() {
let entry = tallies.entry(*t.a()).or_insert_with(|| AttrTally {
trible_count: 0,
entity_ids: HashSet::new(),
});
entry.trible_count += 1;
entry.entity_ids.insert(*t.e());
}
}
}
if let Some(mh) = info.metadata {
if let Ok(meta_set) = reader.get::<TribleSet, _>(mh) {
let kind_id = triblespace_core::metadata::KIND_ATTRIBUTE_USAGE;
let mut usage_entities: HashSet<Id> = HashSet::new();
for t in meta_set.iter() {
if t.a() == &tag_attr {
let v: Value<triblespace::prelude::valueschemas::GenId> =
*t.v();
if let Ok(gid) =
v.try_from_value::<triblespace_core::id::Id>()
{
if gid == kind_id {
usage_entities.insert(*t.e());
}
}
}
}
for t in meta_set.iter() {
if !usage_entities.contains(t.e()) {
continue;
}
if t.a() == &attr_attr {
let v: Value<triblespace::prelude::valueschemas::GenId> =
*t.v();
if let Ok(described_id) =
v.try_from_value::<triblespace_core::id::Id>()
{
for t2 in meta_set.iter() {
if t2.e() == t.e() && t2.a() == &name_attr {
let nh: Value<
Handle<Blake3, LongString>,
> = *t2.v();
if let Ok(view) =
reader.get::<View<str>, _>(nh)
{
attr_names.entry(described_id).or_insert_with(
|| view.as_ref().to_string(),
);
}
}
}
}
}
}
}
}
for p in &info.parents {
stack.push(*p);
}
}
println!("Commits: {commit_count}");
println!("Attributes: {}", tallies.len());
println!();
let mut sorted: Vec<(Id, &AttrTally)> =
tallies.iter().map(|(id, t)| (*id, t)).collect();
sorted.sort_by(|a, b| b.1.trible_count.cmp(&a.1.trible_count));
for (attr_id, tally) in &sorted {
let name = attr_names
.get(attr_id)
.map(|s| s.as_str())
.unwrap_or("-");
if entities {
println!(
"{attr_id:X} tribles={tc} entities={ec} {name}",
tc = tally.trible_count,
ec = tally.entity_ids.len(),
);
} else {
println!(
"{attr_id:X} tribles={tc} {name}",
tc = tally.trible_count,
);
}
}
Ok(())
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
Command::Rename {
pile,
branch,
new_name,
signing_key,
} => {
use triblespace_core::repo::pile::Pile;
use triblespace_core::repo::branch as branch_mod;
use triblespace_core::query::find;
use triblespace_core::macros::pattern;
let branch_id = parse_branch_id_hex(&branch)?;
let key = load_signing_key(&signing_key)?;
let mut pile: Pile<Blake3> = Pile::open(&pile)?;
let res = (|| -> Result<(), anyhow::Error> {
pile.refresh()?;
let mut current_meta_handle = pile
.head(branch_id)?
.ok_or_else(|| anyhow::anyhow!("branch {branch} not found"))?;
loop {
let reader = pile.reader()
.map_err(|e| anyhow::anyhow!("reader: {e:?}"))?;
let meta: TribleSet = reader.get(current_meta_handle)
.map_err(|e| anyhow::anyhow!("read branch meta: {e:?}"))?;
let head_handle: Option<Value<Handle<Blake3, SimpleArchive>>> =
find!((h: Value<_>), pattern!(&meta, [{ triblespace_core::repo::head: ?h }]))
.next()
.map(|(h,)| h);
let commit_blob = if let Some(h) = head_handle {
let commit_set: TribleSet = reader.get(h)
.map_err(|e| anyhow::anyhow!("read commit: {e:?}"))?;
Some(commit_set.to_blob())
} else {
None
};
let name_handle: BranchNameHandle = pile
.put(new_name.clone().to_blob())
.map_err(|e| anyhow::anyhow!("put name blob: {e:?}"))?;
let new_meta = branch_mod::branch_metadata(
&key,
branch_id,
name_handle,
commit_blob,
);
let new_meta_handle = pile
.put(new_meta)
.map_err(|e| anyhow::anyhow!("put branch meta: {e:?}"))?;
match pile.update(branch_id, Some(current_meta_handle), Some(new_meta_handle))? {
triblespace_core::repo::PushResult::Success() => {
println!("renamed {branch_id:X} → \"{new_name}\"");
return Ok(());
}
triblespace_core::repo::PushResult::Conflict(conflict) => {
let conflict = conflict
.ok_or_else(|| anyhow::anyhow!("branch deleted concurrently"))?;
eprintln!("CAS conflict, retrying...");
current_meta_handle = conflict;
}
}
}
})();
let close_res = pile.close().map_err(|e| anyhow::anyhow!("{e:?}"));
res.and(close_res)?;
}
}
Ok(())
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RecordKind {
Set,
Tombstone,
}
#[derive(Clone, Debug)]
struct RawBranchRecord {
offset: u64,
branch_id: Id,
kind: RecordKind,
meta_handle: Option<Value<Handle<Blake3, SimpleArchive>>>,
}
#[derive(Clone, Debug)]
struct BranchState {
kind: RecordKind,
meta: Option<Value<Handle<Blake3, SimpleArchive>>>,
last_set: Option<Value<Handle<Blake3, SimpleArchive>>>,
}
fn scan_pile_records(path: &std::path::Path) -> Result<Vec<RawBranchRecord>> {
let mut file = std::fs::File::open(path)?;
let file_len = file.metadata()?.len();
let mut records = Vec::new();
let mut offset: u64 = 0;
let mut buf = [0u8; RECORD_LEN as usize];
while offset + RECORD_LEN <= file_len {
file.seek(SeekFrom::Start(offset))?;
if file.read_exact(&mut buf).is_err() {
break;
}
let magic: [u8; 16] = buf[0..16].try_into().unwrap();
if magic == MAGIC_MARKER_BLOB.raw() {
let len = u64::from_ne_bytes(buf[24..32].try_into().unwrap());
let pad = blob_padding(len);
offset = offset
.checked_add(RECORD_LEN)
.and_then(|o| o.checked_add(len))
.and_then(|o| o.checked_add(pad))
.ok_or_else(|| anyhow::anyhow!("pile too large"))?;
continue;
}
if magic == MAGIC_MARKER_BRANCH.raw() {
let raw_id: [u8; 16] = buf[16..32].try_into().unwrap();
let Some(id) = Id::new(raw_id) else { break };
let raw_handle: [u8; 32] = buf[32..64].try_into().unwrap();
let meta: Value<Handle<Blake3, SimpleArchive>> = Value::new(raw_handle);
records.push(RawBranchRecord {
offset,
branch_id: id,
kind: RecordKind::Set,
meta_handle: Some(meta),
});
offset += RECORD_LEN;
continue;
}
if magic == MAGIC_MARKER_BRANCH_TOMBSTONE.raw() {
let raw_id: [u8; 16] = buf[16..32].try_into().unwrap();
let Some(id) = Id::new(raw_id) else { break };
records.push(RawBranchRecord {
offset,
branch_id: id,
kind: RecordKind::Tombstone,
meta_handle: None,
});
offset += RECORD_LEN;
continue;
}
break;
}
Ok(records)
}
fn collapse_branch_states(records: &[RawBranchRecord]) -> HashMap<Id, BranchState> {
let mut states: HashMap<Id, BranchState> = HashMap::new();
for rec in records {
let entry = states.entry(rec.branch_id).or_insert(BranchState {
kind: rec.kind,
meta: rec.meta_handle,
last_set: if rec.kind == RecordKind::Set {
rec.meta_handle
} else {
None
},
});
entry.kind = rec.kind;
match rec.kind {
RecordKind::Set => {
entry.meta = rec.meta_handle;
entry.last_set = rec.meta_handle;
}
RecordKind::Tombstone => {
entry.meta = None;
}
}
}
states
}
#[derive(Clone, Debug)]
struct CommitInfo {
parents: Vec<Value<Handle<Blake3, SimpleArchive>>>,
content: Option<Value<Handle<Blake3, SimpleArchive>>>,
metadata: Option<Value<Handle<Blake3, SimpleArchive>>>,
message: Option<Value<Handle<Blake3, LongString>>>,
short_message: Option<String>,
timestamp: Option<Value<triblespace_core::value::schemas::time::NsTAIInterval>>,
signed_by: Option<[u8; 32]>,
}
fn read_commit_fields(commit: &TribleSet) -> CommitInfo {
use triblespace_core::repo;
use triblespace_core::value::schemas::ed25519 as ed;
use triblespace_core::value::schemas::shortstring::ShortString;
use triblespace_core::value::schemas::time::NsTAIInterval;
let content_attr = repo::content.id();
let metadata_attr = repo::metadata.id();
let parent_attr = repo::parent.id();
let message_attr = repo::message.id();
let short_message_attr = repo::short_message.id();
let timestamp_attr = repo::timestamp.id();
let created_at_attr = triblespace_core::metadata::created_at.id();
let signed_by_attr = repo::signed_by.id();
let mut info = CommitInfo {
parents: Vec::new(),
content: None,
metadata: None,
message: None,
short_message: None,
timestamp: None,
signed_by: None,
};
for t in commit.iter() {
let a = *t.a();
if a == parent_attr {
info.parents
.push(*t.v::<Handle<Blake3, SimpleArchive>>());
} else if a == content_attr {
info.content = Some(*t.v::<Handle<Blake3, SimpleArchive>>());
} else if a == metadata_attr {
info.metadata = Some(*t.v::<Handle<Blake3, SimpleArchive>>());
} else if a == message_attr {
info.message = Some(*t.v::<Handle<Blake3, LongString>>());
} else if a == short_message_attr {
let v: Value<ShortString> = *t.v();
info.short_message = v.try_from_value().ok();
} else if a == timestamp_attr || a == created_at_attr {
info.timestamp = Some(*t.v::<NsTAIInterval>());
} else if a == signed_by_attr {
let v: Value<ed::ED25519PublicKey> = *t.v();
info.signed_by = Some(v.raw);
}
}
info
}
fn blob_padding(len: u64) -> u64 {
let rem = len % RECORD_LEN;
if rem == 0 {
0
} else {
RECORD_LEN - rem
}
}
fn extract_repo_head(meta: &TribleSet) -> Option<Value<Handle<Blake3, SimpleArchive>>> {
use triblespace::prelude::blobschemas::SimpleArchive;
use triblespace::prelude::valueschemas::Handle;
use triblespace_core::repo;
use triblespace_core::value::schemas::hash::Blake3;
use triblespace_core::value::Value;
let head_attr = repo::head.id();
let mut head_handle: Option<Value<Handle<Blake3, SimpleArchive>>> = None;
for t in meta.iter() {
if t.a() == &head_attr {
let h: Value<Handle<Blake3, SimpleArchive>> = *t.v();
if head_handle.replace(h).is_some() {
return None;
}
}
}
head_handle
}
fn parse_branch_id_hex(s: &str) -> Result<Id> {
let raw = hex::decode(s).map_err(|e| anyhow::anyhow!("branch id hex decode failed: {e}"))?;
let raw: [u8; 16] = raw
.as_slice()
.try_into()
.map_err(|_| anyhow::anyhow!("branch id must be 16 bytes (32 hex chars)"))?;
Id::new(raw).ok_or_else(|| anyhow::anyhow!("branch id cannot be nil"))
}
fn parse_blake3_handle(s: &str) -> Result<Value<Handle<Blake3, SimpleArchive>>> {
let s = s.trim();
let hex = match s.split_once(':') {
Some((proto, rest)) => {
if proto.eq_ignore_ascii_case("blake3") {
rest
} else {
return Err(anyhow::anyhow!("unsupported handle protocol: {proto}"));
}
}
None => s,
};
let raw = hex::decode(hex).map_err(|e| anyhow::anyhow!("handle hex decode failed: {e}"))?;
let raw: [u8; 32] = raw
.as_slice()
.try_into()
.map_err(|_| anyhow::anyhow!("handle must be 32 bytes (64 hex chars)"))?;
Ok(Value::new(raw))
}
fn parse_blake3_handle_opt(s: &str) -> Result<Option<Value<Handle<Blake3, SimpleArchive>>>> {
let s = s.trim();
if s == "-" || s.eq_ignore_ascii_case("none") {
return Ok(None);
}
Ok(Some(parse_blake3_handle(s)?))
}
fn consolidate_groups(
groups: &std::collections::BTreeMap<String, Vec<(Id, Option<Value<Handle<Blake3, SimpleArchive>>>)>>,
statuses: &HashMap<Id, &str>,
reader: &triblespace_core::repo::pile::PileReader<Blake3>,
repo: &mut Repository<Pile<Blake3>>,
key: &ed25519_dalek::SigningKey,
dry_run: bool,
delete_sources: bool,
) -> Result<usize> {
use std::collections::HashSet;
let parent_attr = triblespace_core::repo::parent.id();
let mut created_count: usize = 0;
for (name, members) in groups {
let heads: Vec<Value<Handle<Blake3, SimpleArchive>>> =
members.iter().filter_map(|(_, h)| *h).collect();
if heads.is_empty() {
if !dry_run && delete_sources {
let cleaned = tombstone_branches(repo, members, None)?;
if cleaned > 0 {
println!("\nname group \"{name}\" ({} branches): all empty, cleaned up {cleaned} branch(es)", members.len());
} else {
println!("\nname group \"{name}\" ({} branches): all empty, skipping", members.len());
}
} else {
println!("\nname group \"{name}\" ({} branches): all empty, skipping", members.len());
}
continue;
}
println!("\nname group \"{name}\" ({} branches, {} with heads):", members.len(), heads.len());
for (bid, head) in members {
let status = statuses.get(bid).copied().unwrap_or("?");
if let Some(h) = head {
let hh: Value<Hash<Blake3>> = Handle::to_hash(*h);
let hex: String = hh.from_value();
println!(" - {bid:X} [{status}] head={}", &hex[..23]);
} else {
println!(" - {bid:X} [{status}] <no head>");
}
}
let unique_heads: Vec<Value<Handle<Blake3, SimpleArchive>>> = {
let mut seen: HashSet<[u8; 32]> = HashSet::new();
heads.iter().copied().filter(|h| seen.insert(h.raw)).collect()
};
let mut subsumed: HashSet<[u8; 32]> = HashSet::new();
if unique_heads.len() > 1 {
for i in 0..unique_heads.len() {
if subsumed.contains(&unique_heads[i].raw) { continue; }
for j in 0..unique_heads.len() {
if i == j { continue; }
if subsumed.contains(&unique_heads[j].raw) { continue; }
match is_ancestor_of(unique_heads[i], unique_heads[j], reader, &parent_attr) {
Ok(true) => {
subsumed.insert(unique_heads[i].raw);
let hh: Value<Hash<Blake3>> = Handle::to_hash(unique_heads[i]);
let hex: String = hh.from_value();
println!(" ({}... subsumed)", &hex[..23]);
break;
}
Ok(false) => {}
Err(e) => {
eprintln!(" warning: ancestry check failed: {e:#}");
}
}
}
}
}
let non_subsumed: Vec<Value<Handle<Blake3, SimpleArchive>>> = unique_heads
.iter()
.copied()
.filter(|h| !subsumed.contains(&h.raw))
.collect();
if non_subsumed.is_empty() {
println!(" -> all heads subsumed, skipping");
continue;
}
if non_subsumed.len() == 1 {
let dominated_head = non_subsumed[0];
let already_active = members.iter().any(|(bid, head)| {
head.as_ref() == Some(&dominated_head)
&& statuses.get(bid).copied() == Some("active")
});
if already_active {
if dry_run {
println!(" -> already consolidated (active branch has the sole non-subsumed head)");
} else if delete_sources {
let keeper = members.iter().find(|(bid, head)| {
head.as_ref() == Some(&dominated_head)
&& statuses.get(bid).copied() == Some("active")
}).map(|(b, _)| *b);
let cleaned = tombstone_branches(repo, members, keeper)?;
if cleaned > 0 {
println!(" -> already consolidated, cleaned up {cleaned} redundant branch(es)");
} else {
println!(" -> already consolidated, skipping");
}
} else {
println!(" -> already consolidated, skipping");
}
continue;
}
}
if dry_run {
println!(" -> would merge {} non-subsumed head(s) into \"{name}\"", non_subsumed.len());
continue;
}
let commit_handle = if non_subsumed.len() == 1 {
println!(" -> single non-subsumed head, creating branch directly");
non_subsumed[0]
} else {
println!(" -> merging {} non-subsumed heads", non_subsumed.len());
let commit_set = triblespace_core::repo::commit::commit_metadata(
key,
non_subsumed.clone(),
None,
None,
None,
);
repo.storage_mut()
.put(commit_set.to_blob())
.map_err(|e| anyhow::anyhow!("failed to put commit blob: {e:?}"))?
};
let new_id = *repo
.create_branch_with_key(name, Some(commit_handle), key.clone())
.map_err(|e| anyhow::anyhow!("failed to create branch '{name}': {e:?}"))?;
println!(" created branch '{name}' with id {new_id:X}");
created_count += 1;
if delete_sources {
let cleaned = tombstone_branches(repo, members, Some(new_id))?;
println!(" deleted {cleaned} source branch(es)");
}
}
Ok(created_count)
}
fn tombstone_branches(
repo: &mut Repository<Pile<Blake3>>,
members: &[(Id, Option<Value<Handle<Blake3, SimpleArchive>>>)],
keeper: Option<Id>,
) -> Result<usize> {
let mut count = 0;
for (bid, _) in members {
if Some(*bid) == keeper { continue; }
let old = repo.storage_mut().head(*bid)?;
match repo.storage_mut().update(*bid, old, None)? {
triblespace_core::repo::PushResult::Success() => { count += 1; }
triblespace_core::repo::PushResult::Conflict(_) => {
eprintln!(" warning: branch {bid:X} advanced concurrently; skipping delete");
}
}
}
Ok(count)
}
fn is_ancestor_of(
ancestor: Value<Handle<Blake3, SimpleArchive>>,
descendant: Value<Handle<Blake3, SimpleArchive>>,
reader: &impl BlobStoreGet<Blake3>,
parent_attr: &Id,
) -> Result<bool> {
use std::collections::HashSet;
let mut visited: HashSet<[u8; 32]> = HashSet::new();
let mut stack: Vec<Value<Handle<Blake3, SimpleArchive>>> = vec![descendant];
while let Some(current) = stack.pop() {
if current.raw == ancestor.raw {
return Ok(true);
}
if !visited.insert(current.raw) {
continue;
}
let commit: TribleSet = match reader.get(current) {
Ok(c) => c,
Err(_) => continue, };
for t in commit.iter() {
if t.a() == parent_attr {
stack.push(*t.v::<Handle<Blake3, SimpleArchive>>());
}
}
}
Ok(false)
}
fn load_branch_name(
reader: &impl BlobStoreGet<Blake3>,
meta: &TribleSet,
) -> Result<Option<String>> {
let name_attr = triblespace_core::metadata::name.id();
let mut handle_opt: Option<BranchNameHandle> = None;
for t in meta.iter() {
if t.a() == &name_attr {
let h: BranchNameHandle = *t.v();
if handle_opt.replace(h).is_some() {
return Ok(None);
}
}
}
let Some(handle) = handle_opt else {
return Ok(None);
};
let view: View<str> = reader
.get(handle)
.map_err(|err| anyhow::anyhow!("read branch name blob: {err:?}"))?;
Ok(Some(view.as_ref().to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn parse_signing_key_hex_and_file() {
let mut seed = [0u8; 32];
for i in 0..32 {
seed[i] = i as u8;
}
let hex = hex::encode(seed);
let mut f = NamedTempFile::new().expect("tmpfile");
writeln!(f, "{}", hex).expect("write");
let path = f.path().to_path_buf();
let key = load_signing_key(&Some(path)).expect("parse file");
let expected = ed25519_dalek::SigningKey::from_bytes(&seed);
assert_eq!(key.to_bytes(), expected.to_bytes());
}
}