pub use vta_sdk::contexts::ContextRecord;
use chrono::Utc;
use crate::error::AppError;
use crate::store::KeyspaceHandle;
fn ctx_key(id: &str) -> String {
format!("ctx:{id}")
}
pub async fn get_context(ks: &KeyspaceHandle, id: &str) -> Result<Option<ContextRecord>, AppError> {
ks.get(ctx_key(id)).await
}
pub async fn store_context(ks: &KeyspaceHandle, record: &ContextRecord) -> Result<(), AppError> {
ks.insert(ctx_key(&record.id), record).await
}
pub async fn store_new_context(
ks: &KeyspaceHandle,
record: &ContextRecord,
) -> Result<bool, AppError> {
ks.insert_if_absent(ctx_key(&record.id), record).await
}
pub async fn delete_context(ks: &KeyspaceHandle, id: &str) -> Result<(), AppError> {
ks.remove(ctx_key(id)).await
}
pub async fn list_contexts(ks: &KeyspaceHandle) -> Result<Vec<ContextRecord>, AppError> {
let raw = ks.prefix_iter_raw("ctx:").await?;
let mut records = Vec::with_capacity(raw.len());
let mut skipped = 0usize;
for (key, value) in raw {
match serde_json::from_slice::<ContextRecord>(&value) {
Ok(record) => records.push(record),
Err(e) => {
skipped += 1;
tracing::warn!(
key = %String::from_utf8_lossy(&key),
error = %e,
"skipping undeserializable context row in list_contexts"
);
}
}
}
if skipped > 0 {
tracing::warn!(skipped, "list_contexts skipped corrupt rows");
}
Ok(records)
}
pub async fn allocate_context_index(
ks: &KeyspaceHandle,
base_prefix: &str,
counter_key: &str,
) -> Result<(u32, String), AppError> {
let current = vti_common::store::counter::allocate_u32(ks, counter_key).await?;
let base_path = format!("{base_prefix}/{current}'");
Ok((current, base_path))
}
pub async fn create_context(
contexts_ks: &KeyspaceHandle,
id: &str,
name: &str,
) -> Result<ContextRecord, Box<dyn std::error::Error>> {
let (index, base_path) = allocate_context_index(contexts_ks, CONTEXT_KEY_BASE, "ctx_counter")
.await
.map_err(|e| format!("{e}"))?;
let now = Utc::now();
let record = ContextRecord {
id: id.to_string(),
name: name.to_string(),
did: None,
description: None,
parent: None,
base_path,
index,
created_at: now,
updated_at: now,
};
if !store_new_context(contexts_ks, &record)
.await
.map_err(|e| format!("{e}"))?
{
return Err(format!("context already exists: {id}").into());
}
Ok(record)
}
pub const CONTEXT_KEY_BASE: &str = "m/26'/2'";
#[cfg(test)]
mod tests {
use super::*;
use crate::store::Store;
use vti_common::config::StoreConfig;
fn temp_ks() -> (KeyspaceHandle, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store = Store::open(&StoreConfig {
data_dir: dir.path().to_path_buf(),
})
.expect("open store");
(
store
.keyspace(crate::keyspaces::CONTEXTS)
.expect("keyspace"),
dir,
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn allocate_context_index_is_collision_free_under_concurrency() {
let (ks, _dir) = temp_ks();
let n = 64usize;
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let ks = ks.clone();
handles.push(tokio::spawn(async move {
allocate_context_index(&ks, CONTEXT_KEY_BASE, "ctx_counter")
.await
.expect("alloc")
}));
}
let mut paths = std::collections::HashSet::with_capacity(n);
for h in handles {
let (_, base_path) = h.await.expect("join");
assert!(
paths.insert(base_path.clone()),
"duplicate context base path {base_path}"
);
}
assert_eq!(paths.len(), n);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_same_id_creates_admit_exactly_one() {
let (ks, _dir) = temp_ks();
let mut handles = Vec::new();
for _ in 0..16 {
let ks = ks.clone();
handles.push(tokio::spawn(async move {
create_context(&ks, "contested", "Contested").await.ok()
}));
}
let mut winners = Vec::new();
for h in handles {
if let Some(rec) = h.await.expect("join") {
winners.push(rec);
}
}
assert_eq!(winners.len(), 1, "exactly one same-id create may win");
let stored = get_context(&ks, "contested")
.await
.expect("get")
.expect("record exists");
assert_eq!(
stored.base_path, winners[0].base_path,
"stored record must be the winner's — no overwrite by losers"
);
}
}