use chrono::Utc;
use khive_runtime::KhiveRuntime;
use khive_storage::types::{SqlStatement, SqlValue};
use crate::error::VcsError;
use crate::snapshot::load_archive;
use crate::types::{KgBranch, SnapshotId};
pub async fn create_branch(
runtime: &KhiveRuntime,
namespace: Option<&str>,
name: &str,
from_branch: Option<&str>,
from_snapshot_id: Option<&SnapshotId>,
) -> Result<KgBranch, VcsError> {
let ns = runtime.ns(namespace).to_string();
let created_at = Utc::now().timestamp_micros();
let sql = runtime.sql();
let mut writer = sql
.writer()
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
let head_id = if let Some(snap_id) = from_snapshot_id {
snap_id.clone()
} else {
let source_branch = from_branch.unwrap_or("main");
let row = writer
.query_row(SqlStatement {
sql: "SELECT head_id FROM kg_branches WHERE namespace = ? AND name = ?".to_string(),
params: vec![
SqlValue::Text(ns.clone()),
SqlValue::Text(source_branch.to_string()),
],
label: None,
})
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
match row {
None => {
return Err(VcsError::BranchNotFound {
namespace: ns,
name: source_branch.to_string(),
})
}
Some(r) => match r.get("head_id") {
Some(SqlValue::Text(s)) => SnapshotId::from_prefixed(s)?,
_ => return Err(VcsError::Internal("branch head_id is not text".to_string())),
},
}
};
writer
.execute(SqlStatement {
sql: "INSERT INTO kg_branches (namespace, name, head_id, created_at, updated_at) \
VALUES (?, ?, ?, ?, ?)"
.to_string(),
params: vec![
SqlValue::Text(ns.clone()),
SqlValue::Text(name.to_string()),
SqlValue::Text(head_id.as_str().to_string()),
SqlValue::Integer(created_at),
SqlValue::Integer(created_at),
],
label: Some("vcs:create_branch".to_string()),
})
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
Ok(KgBranch {
namespace: ns,
name: name.to_string(),
head_id,
created_at,
updated_at: created_at,
})
}
pub async fn list_branches(
runtime: &KhiveRuntime,
namespace: Option<&str>,
) -> Result<Vec<KgBranch>, VcsError> {
let ns = runtime.ns(namespace).to_string();
let sql = runtime.sql();
let mut reader = sql
.reader()
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
let rows = reader
.query_all(SqlStatement {
sql: "SELECT namespace, name, head_id, created_at, updated_at \
FROM kg_branches WHERE namespace = ? ORDER BY created_at ASC"
.to_string(),
params: vec![SqlValue::Text(ns)],
label: Some("vcs:list_branches".to_string()),
})
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
rows.into_iter().map(|r| row_to_branch(&r)).collect()
}
pub async fn get_branch(
runtime: &KhiveRuntime,
namespace: Option<&str>,
name: &str,
) -> Result<Option<KgBranch>, VcsError> {
let ns = runtime.ns(namespace).to_string();
let sql = runtime.sql();
let mut reader = sql
.reader()
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
let row = reader
.query_row(SqlStatement {
sql: "SELECT namespace, name, head_id, created_at, updated_at \
FROM kg_branches WHERE namespace = ? AND name = ?"
.to_string(),
params: vec![SqlValue::Text(ns), SqlValue::Text(name.to_string())],
label: None,
})
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
row.map(|r| row_to_branch(&r)).transpose()
}
pub async fn checkout(
runtime: &KhiveRuntime,
namespace: Option<&str>,
branch_name: Option<&str>,
snapshot_id: Option<&SnapshotId>,
force: bool,
) -> Result<CheckoutSummary, VcsError> {
let ns = runtime.ns(namespace).to_string();
let (resolved_branch, resolved_snap_id) =
resolve_checkout_target(runtime, &ns, branch_name, snapshot_id).await?;
if !force {
let state = crate::snapshot::load_vcs_state(runtime, &ns).await?;
if state.dirty {
let count = estimate_uncommitted_count(runtime, &ns).await?;
return Err(VcsError::UncommittedChanges { count });
}
}
let archive = load_archive(runtime, &resolved_snap_id).await?;
let entities_count = archive.entities.len();
let edges_count = archive.edges.len();
wipe_namespace(runtime, &ns).await?;
runtime
.import_kg(&archive, Some(&ns))
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
let now = Utc::now().timestamp_micros();
let sql = runtime.sql();
let mut writer = sql
.writer()
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
writer
.execute(SqlStatement {
sql: "INSERT INTO kg_vcs_state (namespace, current_branch, last_committed_id, dirty) \
VALUES (?, ?, ?, 0) \
ON CONFLICT(namespace) \
DO UPDATE SET current_branch=excluded.current_branch, \
last_committed_id=excluded.last_committed_id, \
dirty=0"
.to_string(),
params: vec![
SqlValue::Text(ns.clone()),
match &resolved_branch {
Some(b) => SqlValue::Text(b.clone()),
None => SqlValue::Null,
},
SqlValue::Text(resolved_snap_id.as_str().to_string()),
],
label: Some("vcs:checkout_state".to_string()),
})
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
drop(writer);
if let Some(ref b) = resolved_branch {
let _ = sql
.writer()
.await
.map_err(|e| VcsError::Storage(e.to_string()))?
.execute(SqlStatement {
sql: "UPDATE kg_branches SET head_id = ?, updated_at = ? \
WHERE namespace = ? AND name = ?"
.to_string(),
params: vec![
SqlValue::Text(resolved_snap_id.as_str().to_string()),
SqlValue::Integer(now),
SqlValue::Text(ns.clone()),
SqlValue::Text(b.clone()),
],
label: None,
})
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
}
Ok(CheckoutSummary {
branch_name: resolved_branch,
snapshot_id: resolved_snap_id,
entities_restored: entities_count,
edges_restored: edges_count,
vector_index_status: VectorIndexStatus::Synchronous,
})
}
#[derive(Debug)]
pub struct CheckoutSummary {
pub branch_name: Option<String>,
pub snapshot_id: SnapshotId,
pub entities_restored: usize,
pub edges_restored: usize,
pub vector_index_status: VectorIndexStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VectorIndexStatus {
Synchronous,
Rebuilding,
}
async fn resolve_checkout_target(
runtime: &KhiveRuntime,
namespace: &str,
branch_name: Option<&str>,
snapshot_id: Option<&SnapshotId>,
) -> Result<(Option<String>, SnapshotId), VcsError> {
match (branch_name, snapshot_id) {
(_, Some(sid)) => Ok((None, sid.clone())),
(Some(branch), None) => {
let branch_row = get_branch(runtime, Some(namespace), branch).await?;
match branch_row {
None => Err(VcsError::BranchNotFound {
namespace: namespace.to_string(),
name: branch.to_string(),
}),
Some(b) => Ok((Some(branch.to_string()), b.head_id)),
}
}
(None, None) => {
let state = crate::snapshot::load_vcs_state(runtime, namespace).await?;
match state.current_branch {
Some(b) => {
let branch_row = get_branch(runtime, Some(namespace), &b).await?;
match branch_row {
None => Err(VcsError::BranchNotFound {
namespace: namespace.to_string(),
name: b,
}),
Some(br) => Ok((Some(b), br.head_id)),
}
}
None => Err(VcsError::Internal(
"no current branch and no target specified".to_string(),
)),
}
}
}
}
async fn estimate_uncommitted_count(
runtime: &KhiveRuntime,
namespace: &str,
) -> Result<usize, VcsError> {
let entities = runtime
.list_entities(Some(namespace), None, u32::MAX)
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
Ok(entities.len())
}
async fn wipe_namespace(runtime: &KhiveRuntime, namespace: &str) -> Result<(), VcsError> {
let entities = runtime
.list_entities(Some(namespace), None, u32::MAX)
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
for e in &entities {
runtime
.delete_entity(Some(namespace), e.id, true)
.await
.map_err(|e| VcsError::Storage(e.to_string()))?;
}
Ok(())
}
fn row_to_branch(row: &khive_storage::types::SqlRow) -> Result<KgBranch, VcsError> {
let namespace = match row.get("namespace") {
Some(SqlValue::Text(s)) => s.clone(),
_ => return Err(VcsError::Internal("missing namespace".into())),
};
let name = match row.get("name") {
Some(SqlValue::Text(s)) => s.clone(),
_ => return Err(VcsError::Internal("missing name".into())),
};
let head_id = match row.get("head_id") {
Some(SqlValue::Text(s)) => SnapshotId::from_prefixed(s)?,
_ => return Err(VcsError::Internal("missing head_id".into())),
};
let created_at = match row.get("created_at") {
Some(SqlValue::Integer(n)) => *n,
_ => 0,
};
let updated_at = match row.get("updated_at") {
Some(SqlValue::Integer(n)) => *n,
_ => created_at,
};
Ok(KgBranch {
namespace,
name,
head_id,
created_at,
updated_at,
})
}