use std::collections::HashMap;
use chroma::{
types::{
GetResponse, Include, IncludeList, Metadata, MetadataComparison, MetadataExpression,
MetadataValue, PrimitiveOperator, Where,
},
ChromaCollection,
};
use crate::{AgentID, EmbeddingService, FileWrite, Transaction, TransactionChunk, TransactionID};
use claudius::MessageParam;
#[derive(Debug)]
pub enum ContextManagerError {
ChromaError(String),
ChunkingError(crate::TransactionSerializationError),
GuidError,
EmbeddingError(anyhow::Error),
LoadAgentError(String),
FileError(FileSystemError),
}
#[derive(Debug)]
pub enum FileSystemError {
InvalidPath(String),
FileNotFound(String),
InvalidPattern(String),
ContentNotFound(String),
}
impl std::fmt::Display for FileSystemError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FileSystemError::InvalidPath(msg) => write!(f, "Invalid file path: {}", msg),
FileSystemError::FileNotFound(path) => write!(f, "File not found: {}", path),
FileSystemError::InvalidPattern(msg) => write!(f, "Invalid search pattern: {}", msg),
FileSystemError::ContentNotFound(msg) => write!(f, "Content not found: {}", msg),
}
}
}
impl std::error::Error for FileSystemError {}
impl From<FileSystemError> for ContextManagerError {
fn from(error: FileSystemError) -> Self {
ContextManagerError::FileError(error)
}
}
impl std::fmt::Display for ContextManagerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ContextManagerError::ChromaError(e) => write!(f, "Chroma error: {}", e),
ContextManagerError::ChunkingError(e) => {
write!(f, "Transaction chunking error: {}", e)
}
ContextManagerError::GuidError => write!(f, "Failed to generate GUID"),
ContextManagerError::EmbeddingError(e) => write!(f, "Embedding error: {}", e),
ContextManagerError::LoadAgentError(e) => write!(f, "Agent loading error: {}", e),
ContextManagerError::FileError(e) => write!(f, "File system error: {}", e),
}
}
}
impl std::error::Error for ContextManagerError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ContextManagerError::ChromaError(_) => None,
ContextManagerError::ChunkingError(e) => Some(e),
ContextManagerError::GuidError => None,
ContextManagerError::EmbeddingError(e) => Some(e.as_ref()),
ContextManagerError::LoadAgentError(_) => None,
ContextManagerError::FileError(e) => Some(e),
}
}
}
impl From<crate::TransactionSerializationError> for ContextManagerError {
fn from(error: crate::TransactionSerializationError) -> Self {
ContextManagerError::ChunkingError(error)
}
}
impl From<anyhow::Error> for ContextManagerError {
fn from(error: anyhow::Error) -> Self {
ContextManagerError::EmbeddingError(error)
}
}
fn extract_metadata_string(
metadata: &HashMap<String, MetadataValue>,
field_name: &str,
chunk_id: &str,
) -> Result<String, ContextManagerError> {
metadata
.get(field_name)
.and_then(|v| match v {
MetadataValue::Str(s) => Some(s.clone()),
_ => None,
})
.ok_or_else(|| {
ContextManagerError::LoadAgentError(format!(
"Missing or invalid {} in chunk {}",
field_name, chunk_id
))
})
}
fn extract_metadata_u32(
metadata: &HashMap<String, MetadataValue>,
field_name: &str,
chunk_id: &str,
) -> Result<u32, ContextManagerError> {
metadata
.get(field_name)
.and_then(|v| match v {
MetadataValue::Int(i) => Some(*i as u32),
_ => None,
})
.ok_or_else(|| {
ContextManagerError::LoadAgentError(format!(
"Missing or invalid {} in chunk {}",
field_name, chunk_id
))
})
}
fn extract_metadata_u64(
metadata: &HashMap<String, MetadataValue>,
field_name: &str,
chunk_id: &str,
) -> Result<u64, ContextManagerError> {
metadata
.get(field_name)
.and_then(|v| match v {
MetadataValue::Int(i) => Some(*i as u64),
_ => None,
})
.ok_or_else(|| {
ContextManagerError::LoadAgentError(format!(
"Missing or invalid {} in chunk {}",
field_name, chunk_id
))
})
}
fn validate_file_path(path: &str) -> Result<(), FileSystemError> {
if path.is_empty() {
return Err(FileSystemError::InvalidPath(
"File path cannot be empty".to_string(),
));
}
if path.contains('\0') {
return Err(FileSystemError::InvalidPath(
"File path cannot contain null bytes".to_string(),
));
}
if path.contains("..") {
return Err(FileSystemError::InvalidPath(
"File path cannot contain '..' (path traversal)".to_string(),
));
}
if !path.starts_with('/') {
return Err(FileSystemError::InvalidPath(
"File path must start with '/'".to_string(),
));
}
if path.len() > 4096 {
return Err(FileSystemError::InvalidPath(
"File path too long (max 4096 characters)".to_string(),
));
}
Ok(())
}
fn generate_chunk_id(
agent_id: AgentID,
context_seq_no: u32,
transaction_seq_no: u64,
chunk_seq_no: u32,
) -> String {
format!(
"{}:{}:{}:{}",
agent_id, context_seq_no, transaction_seq_no, chunk_seq_no
)
}
fn metadata_equals(
key: impl Into<String>,
value: impl Into<MetadataValue>,
) -> Where {
Where::Metadata(MetadataExpression {
key: key.into(),
comparison: MetadataComparison::Primitive(PrimitiveOperator::Equal, value.into()),
})
}
fn find_most_recent_file_content(
contexts: &[AgentContext],
mount: crate::MountID,
path: &str,
) -> Option<String> {
for context in contexts.iter().rev() {
for transaction in context.transactions.iter().rev() {
for write in transaction.writes.iter().rev() {
if write.mount == mount && write.path == path {
return Some(write.data.clone());
}
}
}
}
None
}
#[derive(Debug, Clone)]
pub struct AgentContext {
pub agent_id: AgentID,
pub context_seq_no: u32,
pub transactions: Vec<Transaction>,
}
#[derive(Debug, Clone)]
pub struct AgentData {
pub agent_id: AgentID,
pub contexts: Vec<AgentContext>,
}
impl AgentData {
pub fn all_transactions(&self) -> Vec<&Transaction> {
let mut transactions = Vec::new();
for context in &self.contexts {
transactions.extend(context.transactions.iter());
}
transactions
}
pub fn latest_context(&self) -> Option<&AgentContext> {
self.contexts.iter().max_by_key(|c| c.context_seq_no)
}
pub fn get_context(&self, context_seq_no: u32) -> Option<&AgentContext> {
self.contexts
.iter()
.find(|c| c.context_seq_no == context_seq_no)
}
pub fn next_transaction<'a>(
&'a mut self,
context_manager: &'a ContextManager,
) -> TransactionBuilder<'a> {
TransactionBuilder::new_in_current_context(self, context_manager)
}
pub fn new_context<'a>(
&'a mut self,
context_manager: &'a ContextManager,
) -> TransactionBuilder<'a> {
TransactionBuilder::new_in_next_context(self, context_manager)
}
pub fn get_file_content(
&self,
mount: crate::MountID,
path: &str,
) -> Result<Option<String>, FileSystemError> {
validate_file_path(path)?;
Ok(find_most_recent_file_content(&self.contexts, mount, path))
}
pub fn list_files(&self, mount: crate::MountID) -> Vec<String> {
use std::collections::HashSet;
let mut files: HashSet<String> = self
.contexts
.iter()
.flat_map(|context| &context.transactions)
.flat_map(|transaction| &transaction.writes)
.filter(|write| write.mount == mount)
.map(|write| write.path.clone())
.collect();
let mut result: Vec<String> = files.drain().collect();
result.sort();
result
}
pub fn search_file_contents(
&self,
mount: crate::MountID,
pattern: &str,
) -> Result<Vec<(String, Vec<String>)>, FileSystemError> {
if pattern.is_empty() {
return Err(FileSystemError::InvalidPattern(
"Search pattern cannot be empty".to_string(),
));
}
let files = self.list_files(mount);
let matches = files
.into_iter()
.filter_map(|file_path| {
match self.get_file_content(mount, &file_path) {
Ok(Some(content)) => {
let matching_lines: Vec<String> = content
.lines()
.enumerate()
.filter_map(|(line_num, line)| {
if line.contains(pattern) {
Some(format!("{}:{}", line_num + 1, line))
} else {
None
}
})
.collect();
if matching_lines.is_empty() {
None
} else {
Some((file_path, matching_lines))
}
}
Ok(None) | Err(_) => None, }
})
.collect();
Ok(matches)
}
}
pub struct TransactionBuilder<'a> {
agent_data: &'a mut AgentData,
context_manager: &'a ContextManager,
context_seq_no: u32,
transaction_seq_no: u64,
msgs: Vec<MessageParam>,
writes: Vec<FileWrite>,
}
impl<'a> TransactionBuilder<'a> {
fn create_transaction(&self) -> Transaction {
Transaction {
agent_id: self.agent_data.agent_id,
context_seq_no: self.context_seq_no,
transaction_seq_no: self.transaction_seq_no,
msgs: self.msgs.clone(),
writes: self.writes.clone(),
}
}
fn add_validated_write<P: Into<String>, D: Into<String>>(
&mut self,
mount: crate::MountID,
path: P,
data: D,
) -> Result<(), FileSystemError> {
let path_str = path.into();
validate_file_path(&path_str)?;
self.writes.push(FileWrite {
mount,
path: path_str,
data: data.into(),
});
Ok(())
}
fn new_in_current_context(
agent_data: &'a mut AgentData,
context_manager: &'a ContextManager,
) -> Self {
let (context_seq_no, transaction_seq_no) =
if let Some(latest_context) = agent_data.latest_context() {
let next_transaction_seq = latest_context
.transactions
.iter()
.map(|t| t.transaction_seq_no)
.max()
.unwrap_or(0)
+ 1;
(latest_context.context_seq_no, next_transaction_seq)
} else {
(1, 1)
};
TransactionBuilder {
agent_data,
context_manager,
context_seq_no,
transaction_seq_no,
msgs: Vec::new(),
writes: Vec::new(),
}
}
fn new_in_next_context(
agent_data: &'a mut AgentData,
context_manager: &'a ContextManager,
) -> Self {
let next_context_seq = agent_data
.contexts
.iter()
.map(|c| c.context_seq_no)
.max()
.unwrap_or(0)
+ 1;
TransactionBuilder {
agent_data,
context_manager,
context_seq_no: next_context_seq,
transaction_seq_no: 1, msgs: Vec::new(),
writes: Vec::new(),
}
}
pub fn message(mut self, message: MessageParam) -> Self {
self.msgs.push(message);
self
}
pub fn messages(mut self, messages: Vec<MessageParam>) -> Self {
self.msgs.extend(messages);
self
}
pub fn write_file<P: Into<String>, D: Into<String>>(
mut self,
mount: crate::MountID,
path: P,
data: D,
) -> Result<Self, FileSystemError> {
self.add_validated_write(mount, path, data)?;
Ok(self)
}
pub fn write_files(mut self, writes: Vec<FileWrite>) -> Result<Self, FileSystemError> {
for write in &writes {
validate_file_path(&write.path)?;
}
self.writes.extend(writes);
Ok(self)
}
pub fn str_replace_file<P: Into<String>>(
mut self,
mount: crate::MountID,
path: P,
old_content: &str,
new_content: &str,
) -> Result<Self, FileSystemError> {
let path_str = path.into();
match self.agent_data.get_file_content(mount, &path_str)? {
Some(current_content) => {
if !current_content.contains(old_content) {
return Err(FileSystemError::ContentNotFound(format!(
"Content '{}' not found in file {}",
old_content, path_str
)));
}
let updated_content = current_content.replace(old_content, new_content);
self.writes.push(FileWrite {
mount,
path: path_str,
data: updated_content,
});
Ok(self)
}
None => Err(FileSystemError::FileNotFound(path_str)),
}
}
pub fn insert_file<P: Into<String>, D: Into<String>>(
mut self,
mount: crate::MountID,
path: P,
content: D,
) -> Result<Self, FileSystemError> {
self.add_validated_write(mount, path, content)?;
Ok(self)
}
pub fn get_buffered_content(
&self,
mount: crate::MountID,
path: &str,
) -> Result<Option<String>, FileSystemError> {
validate_file_path(path)?;
for write in self.writes.iter().rev() {
if write.mount == mount && write.path == path {
return Ok(Some(write.data.clone()));
}
}
self.agent_data.get_file_content(mount, path)
}
pub fn view_file(&self, mount: crate::MountID, path: &str) -> Option<String> {
self.get_buffered_content(mount, path).ok().flatten()
}
pub fn list_files(&self, mount: crate::MountID) -> Vec<String> {
use std::collections::HashSet;
let mut files: HashSet<String> = self.agent_data.list_files(mount).into_iter().collect();
for write in &self.writes {
if write.mount == mount {
files.insert(write.path.clone());
}
}
let mut result: Vec<String> = files.into_iter().collect();
result.sort();
result
}
pub fn search_files(&self, mount: crate::MountID, pattern: &str) -> Vec<(String, Vec<String>)> {
let files = self.list_files(mount);
files
.into_iter()
.filter_map(|file_path| {
match self.get_buffered_content(mount, &file_path) {
Ok(Some(content)) => {
let matching_lines: Vec<String> = content
.lines()
.enumerate()
.filter_map(|(line_num, line)| {
if line.contains(pattern) {
Some(format!("{}:{}", line_num + 1, line))
} else {
None
}
})
.collect();
if matching_lines.is_empty() {
None
} else {
Some((file_path, matching_lines))
}
}
Ok(None) | Err(_) => None,
}
})
.collect()
}
pub fn get_write_summary(&self) -> Vec<(crate::MountID, String, usize)> {
let mut summary = Vec::new();
for write in &self.writes {
summary.push((write.mount, write.path.clone(), write.data.len()));
}
summary
}
pub async fn save(mut self) -> Result<String, ContextManagerError> {
let transaction = self.create_transaction();
let nonce = self
.context_manager
.persist_transaction(&transaction)
.await?;
self.update_agent_data(transaction);
Ok(nonce)
}
pub fn build(self) -> Transaction {
self.create_transaction()
}
fn update_agent_data(&mut self, transaction: Transaction) {
if let Some(context) = self
.agent_data
.contexts
.iter_mut()
.find(|c| c.context_seq_no == self.context_seq_no)
{
context.transactions.push(transaction);
context.transactions.sort_by_key(|t| t.transaction_seq_no);
} else {
let new_context = AgentContext {
agent_id: self.agent_data.agent_id,
context_seq_no: self.context_seq_no,
transactions: vec![transaction],
};
self.agent_data.contexts.push(new_context);
self.agent_data.contexts.sort_by_key(|c| c.context_seq_no);
}
}
}
pub struct ContextManager {
collection: ChromaCollection,
embedding_service: EmbeddingService,
}
impl ContextManager {
pub fn new(collection: ChromaCollection) -> Result<Self, ContextManagerError> {
let embedding_service = EmbeddingService::new()?;
Ok(ContextManager {
collection,
embedding_service,
})
}
pub async fn persist_transaction(
&self,
transaction: &Transaction,
) -> Result<String, ContextManagerError> {
let nonce = TransactionID::generate()
.ok_or(ContextManagerError::GuidError)?
.to_string();
let chunks = transaction.chunk_transaction()?;
let chunk_ids: Vec<String> = chunks
.iter()
.map(|chunk| {
generate_chunk_id(
chunk.agent_id,
chunk.context_seq_no,
chunk.transaction_seq_no,
chunk.chunk_seq_no,
)
})
.collect();
let metadatas: Vec<Metadata> = chunks
.iter()
.map(|chunk| {
let mut metadata: Metadata = HashMap::new();
metadata.insert("nonce".to_string(), MetadataValue::Str(nonce.clone()));
metadata.insert(
"agent_id".to_string(),
MetadataValue::Str(chunk.agent_id.to_string()),
);
metadata.insert(
"context_seq_no".to_string(),
MetadataValue::Int(chunk.context_seq_no as i64),
);
metadata.insert(
"transaction_seq_no".to_string(),
MetadataValue::Int(chunk.transaction_seq_no as i64),
);
metadata.insert(
"chunk_seq_no".to_string(),
MetadataValue::Int(chunk.chunk_seq_no as i64),
);
metadata.insert(
"total_chunks".to_string(),
MetadataValue::Int(chunk.total_chunks as i64),
);
metadata
})
.collect();
let metadata_entries: Vec<Option<Metadata>> =
metadatas.into_iter().map(Some).collect();
let document_texts: Vec<String> = chunks.iter().map(|chunk| chunk.data.clone()).collect();
let document_refs: Vec<&str> = document_texts.iter().map(|doc| doc.as_str()).collect();
let embeddings = self.embedding_service.embed(&document_refs)?;
let documents: Vec<Option<String>> =
document_texts.into_iter().map(Some).collect();
self.collection
.add(chunk_ids, embeddings, Some(documents), None, Some(metadata_entries))
.await
.map_err(|e| ContextManagerError::ChromaError(e.to_string()))?;
let verification_successful = self.verify_persistence(transaction, &nonce).await?;
if !verification_successful {
return Err(ContextManagerError::ChromaError(
"Transaction persistence verification failed".to_string(),
));
}
Ok(nonce)
}
pub async fn verify_persistence(
&self,
transaction: &Transaction,
expected_nonce: &str,
) -> Result<bool, ContextManagerError> {
let chunk_0_id = generate_chunk_id(
transaction.agent_id,
transaction.context_seq_no,
transaction.transaction_seq_no,
0,
);
let include = IncludeList(vec![Include::Metadata]);
let result = self
.collection
.get(
Some(vec![chunk_0_id]),
None,
Some(1),
Some(0),
Some(include),
)
.await
.map_err(|e| ContextManagerError::ChromaError(e.to_string()))?;
if result.ids.len() != 1 {
return Ok(false);
}
if let Some(metadatas) = result.metadatas
&& let Some(Some(metadata)) = metadatas.first()
&& let Some(MetadataValue::Str(nonce_str)) = metadata.get("nonce")
{
return Ok(nonce_str == expected_nonce);
}
Ok(false)
}
pub async fn load_agent(&self, agent_id: AgentID) -> Result<AgentData, ContextManagerError> {
let mut all_chunks = Vec::new();
let batch_size: u32 = 300; let mut offset: u32 = 0;
loop {
let agent_filter = metadata_equals("agent_id", agent_id.to_string());
let include = IncludeList(vec![Include::Metadata, Include::Document]);
let result = self
.collection
.get(
None,
Some(agent_filter),
Some(batch_size),
Some(offset),
Some(include),
)
.await
.map_err(|e| ContextManagerError::ChromaError(e.to_string()))?;
let batch_chunks = self.convert_chroma_result_to_chunks(result)?;
let batch_size_returned = batch_chunks.len();
all_chunks.extend(batch_chunks);
if batch_size_returned < batch_size as usize {
break;
}
offset = offset.saturating_add(batch_size);
}
let agent_data = self.assemble_agent_data(agent_id, all_chunks)?;
Ok(agent_data)
}
fn convert_chroma_result_to_chunks(
&self,
result: GetResponse,
) -> Result<Vec<TransactionChunk>, ContextManagerError> {
let GetResponse {
ids,
metadatas,
documents,
..
} = result;
let Some(metadatas) = metadatas else {
return Err(ContextManagerError::LoadAgentError(
"No metadata returned from Chroma".to_string(),
));
};
let Some(documents) = documents else {
return Err(ContextManagerError::LoadAgentError(
"No documents returned from Chroma".to_string(),
));
};
let mut chunks = Vec::new();
for (i, id) in ids.iter().enumerate() {
let metadata = metadatas.get(i).and_then(|m| m.as_ref()).ok_or_else(|| {
ContextManagerError::LoadAgentError(format!("Missing metadata for chunk {}", id))
})?;
let document = documents.get(i).and_then(|d| d.as_ref()).ok_or_else(|| {
ContextManagerError::LoadAgentError(format!("Missing document for chunk {}", id))
})?;
let agent_id_str = extract_metadata_string(metadata, "agent_id", id)?;
let agent_id = AgentID::from_human_readable(&agent_id_str).ok_or_else(|| {
ContextManagerError::LoadAgentError(format!(
"Invalid agent_id format in chunk {}: {}",
id, agent_id_str
))
})?;
let context_seq_no = extract_metadata_u32(metadata, "context_seq_no", id)?;
let transaction_seq_no = extract_metadata_u64(metadata, "transaction_seq_no", id)?;
let chunk_seq_no = extract_metadata_u32(metadata, "chunk_seq_no", id)?;
let total_chunks = extract_metadata_u32(metadata, "total_chunks", id)?;
chunks.push(TransactionChunk {
agent_id,
context_seq_no,
transaction_seq_no,
chunk_seq_no,
total_chunks,
data: document.clone(),
});
}
Ok(chunks)
}
fn assemble_agent_data(
&self,
agent_id: AgentID,
chunks: Vec<TransactionChunk>,
) -> Result<AgentData, ContextManagerError> {
use std::collections::BTreeMap;
let mut context_transaction_chunks: BTreeMap<(u32, u64), Vec<TransactionChunk>> =
BTreeMap::new();
for chunk in chunks {
let key = (chunk.context_seq_no, chunk.transaction_seq_no);
context_transaction_chunks
.entry(key)
.or_default()
.push(chunk);
}
let mut contexts_map: BTreeMap<u32, Vec<Transaction>> = BTreeMap::new();
for ((context_seq_no, _transaction_seq_no), mut transaction_chunks) in
context_transaction_chunks
{
transaction_chunks.sort_by_key(|c| c.chunk_seq_no);
let transaction = Transaction::from_chunks(transaction_chunks).map_err(|e| {
ContextManagerError::LoadAgentError(format!(
"Failed to assemble transaction: {}",
e
))
})?;
contexts_map
.entry(context_seq_no)
.or_default()
.push(transaction);
}
let mut contexts = Vec::new();
for (context_seq_no, mut transactions) in contexts_map {
transactions.sort_by_key(|t| t.transaction_seq_no);
contexts.push(AgentContext {
agent_id,
context_seq_no,
transactions,
});
}
contexts.sort_by_key(|c| c.context_seq_no);
Ok(AgentData { agent_id, contexts })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::AgentID;
use chroma::ChromaHttpClient;
use claudius::{MessageParam, MessageRole};
async fn create_test_client() -> ChromaHttpClient {
ChromaHttpClient::cloud().expect("Failed to construct Chroma client")
}
fn create_test_transaction() -> Transaction {
Transaction {
agent_id: AgentID::generate().unwrap(),
context_seq_no: 1,
transaction_seq_no: 42,
msgs: vec![MessageParam {
role: MessageRole::User,
content: "Test message".into(),
}],
writes: vec![],
}
}
async fn create_test_context_manager() -> ContextManager {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
ContextManager::new(collection).expect("Failed to create ContextManager")
}
#[tokio::test]
async fn context_manager_creation() {
let _context_manager = create_test_context_manager().await;
}
#[tokio::test]
async fn persist_and_verify_transaction() {
let context_manager = create_test_context_manager().await;
let transaction = create_test_transaction();
let nonce_result = context_manager.persist_transaction(&transaction).await;
assert!(nonce_result.is_ok());
let nonce = nonce_result.unwrap();
assert!(!nonce.is_empty());
let verification_result = context_manager
.verify_persistence(&transaction, &nonce)
.await;
assert!(verification_result.is_ok());
assert!(verification_result.unwrap());
let wrong_nonce = TransactionID::generate().unwrap().to_string();
let wrong_verification = context_manager
.verify_persistence(&transaction, &wrong_nonce)
.await;
assert!(wrong_verification.is_ok());
assert!(!wrong_verification.unwrap());
}
#[tokio::test]
async fn persist_transaction_with_multiple_chunks() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let large_content = "x".repeat(crate::CHUNK_SIZE_LIMIT * 2);
let mut transaction = create_test_transaction();
transaction.msgs.push(MessageParam {
role: MessageRole::Assistant,
content: large_content.into(),
});
let nonce = context_manager
.persist_transaction(&transaction)
.await
.unwrap();
let verification = context_manager
.verify_persistence(&transaction, &nonce)
.await
.unwrap();
assert!(verification);
}
#[tokio::test]
async fn verify_nonexistent_transaction() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let transaction = create_test_transaction();
let fake_nonce = TransactionID::generate().unwrap().to_string();
let verification = context_manager
.verify_persistence(&transaction, &fake_nonce)
.await
.unwrap();
assert!(!verification);
}
#[tokio::test]
async fn load_agent_single_context_single_transaction() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let transaction = Transaction {
agent_id,
context_seq_no: 1,
transaction_seq_no: 1,
msgs: vec![MessageParam {
role: MessageRole::User,
content: "Test message for load_agent".into(),
}],
writes: vec![],
};
let _nonce = context_manager
.persist_transaction(&transaction)
.await
.unwrap();
let agent_data = context_manager.load_agent(agent_id).await.unwrap();
assert_eq!(agent_data.agent_id, agent_id);
assert_eq!(agent_data.contexts.len(), 1);
let context = &agent_data.contexts[0];
assert_eq!(context.agent_id, agent_id);
assert_eq!(context.context_seq_no, 1);
assert_eq!(context.transactions.len(), 1);
let loaded_transaction = &context.transactions[0];
assert_eq!(loaded_transaction.agent_id, transaction.agent_id);
assert_eq!(
loaded_transaction.context_seq_no,
transaction.context_seq_no
);
assert_eq!(
loaded_transaction.transaction_seq_no,
transaction.transaction_seq_no
);
assert_eq!(loaded_transaction.msgs.len(), transaction.msgs.len());
}
#[tokio::test]
async fn load_agent_multiple_contexts_multiple_transactions() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let transactions = vec![
Transaction {
agent_id,
context_seq_no: 1,
transaction_seq_no: 1,
msgs: vec![MessageParam {
role: MessageRole::User,
content: "Context 1, Transaction 1".into(),
}],
writes: vec![],
},
Transaction {
agent_id,
context_seq_no: 1,
transaction_seq_no: 2,
msgs: vec![MessageParam {
role: MessageRole::Assistant,
content: "Context 1, Transaction 2".into(),
}],
writes: vec![],
},
Transaction {
agent_id,
context_seq_no: 2,
transaction_seq_no: 1,
msgs: vec![MessageParam {
role: MessageRole::User,
content: "Context 2, Transaction 1".into(),
}],
writes: vec![],
},
];
for transaction in &transactions {
let _nonce = context_manager
.persist_transaction(transaction)
.await
.unwrap();
}
let agent_data = context_manager.load_agent(agent_id).await.unwrap();
assert_eq!(agent_data.agent_id, agent_id);
assert_eq!(agent_data.contexts.len(), 2);
let context1 = &agent_data.contexts[0];
assert_eq!(context1.context_seq_no, 1);
assert_eq!(context1.transactions.len(), 2);
assert_eq!(context1.transactions[0].transaction_seq_no, 1);
assert_eq!(context1.transactions[1].transaction_seq_no, 2);
let context2 = &agent_data.contexts[1];
assert_eq!(context2.context_seq_no, 2);
assert_eq!(context2.transactions.len(), 1);
assert_eq!(context2.transactions[0].transaction_seq_no, 1);
let all_transactions = agent_data.all_transactions();
assert_eq!(all_transactions.len(), 3);
let latest_context = agent_data.latest_context().unwrap();
assert_eq!(latest_context.context_seq_no, 2);
let specific_context = agent_data.get_context(1).unwrap();
assert_eq!(specific_context.transactions.len(), 2);
}
#[tokio::test]
async fn load_agent_with_chunked_transactions() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let large_content = "x".repeat(crate::CHUNK_SIZE_LIMIT * 2);
let transaction = Transaction {
agent_id,
context_seq_no: 1,
transaction_seq_no: 1,
msgs: vec![MessageParam {
role: MessageRole::User,
content: large_content.into(),
}],
writes: vec![],
};
let _nonce = context_manager
.persist_transaction(&transaction)
.await
.unwrap();
let agent_data = context_manager.load_agent(agent_id).await.unwrap();
assert_eq!(agent_data.contexts.len(), 1);
let context = &agent_data.contexts[0];
assert_eq!(context.transactions.len(), 1);
let loaded_transaction = &context.transactions[0];
assert_eq!(loaded_transaction.msgs.len(), transaction.msgs.len());
assert_eq!(
loaded_transaction.msgs[0].content,
transaction.msgs[0].content
);
}
#[tokio::test]
async fn load_nonexistent_agent() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let nonexistent_agent_id = AgentID::generate().unwrap();
let agent_data = context_manager
.load_agent(nonexistent_agent_id)
.await
.unwrap();
assert_eq!(agent_data.agent_id, nonexistent_agent_id);
assert!(agent_data.contexts.is_empty());
assert!(agent_data.all_transactions().is_empty());
assert!(agent_data.latest_context().is_none());
}
#[tokio::test]
async fn fluent_transaction_building_next_transaction() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let transaction = Transaction {
agent_id,
context_seq_no: 1,
transaction_seq_no: 1,
msgs: vec![MessageParam {
role: MessageRole::User,
content: "Initial message".into(),
}],
writes: vec![],
};
context_manager
.persist_transaction(&transaction)
.await
.unwrap();
let mut agent_data = context_manager.load_agent(agent_id).await.unwrap();
let nonce = agent_data
.next_transaction(&context_manager)
.message(MessageParam {
role: MessageRole::Assistant,
content: "Response message".into(),
})
.save()
.await
.unwrap();
assert!(!nonce.is_empty());
assert_eq!(agent_data.contexts.len(), 1);
let context = &agent_data.contexts[0];
assert_eq!(context.transactions.len(), 2);
assert_eq!(context.transactions[1].transaction_seq_no, 2);
assert_eq!(context.transactions[1].msgs.len(), 1);
let reloaded_data = context_manager.load_agent(agent_id).await.unwrap();
assert_eq!(reloaded_data.contexts[0].transactions.len(), 2);
}
#[tokio::test]
async fn fluent_transaction_building_new_context() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let transaction = Transaction {
agent_id,
context_seq_no: 1,
transaction_seq_no: 1,
msgs: vec![MessageParam {
role: MessageRole::User,
content: "Initial message".into(),
}],
writes: vec![],
};
context_manager
.persist_transaction(&transaction)
.await
.unwrap();
let mut agent_data = context_manager.load_agent(agent_id).await.unwrap();
let nonce = agent_data
.new_context(&context_manager)
.message(MessageParam {
role: MessageRole::User,
content: "New context message".into(),
})
.save()
.await
.unwrap();
assert!(!nonce.is_empty());
assert_eq!(agent_data.contexts.len(), 2);
let new_context = &agent_data.contexts[1];
assert_eq!(new_context.context_seq_no, 2);
assert_eq!(new_context.transactions.len(), 1);
assert_eq!(new_context.transactions[0].transaction_seq_no, 1);
let reloaded_data = context_manager.load_agent(agent_id).await.unwrap();
assert_eq!(reloaded_data.contexts.len(), 2);
assert_eq!(reloaded_data.contexts[1].context_seq_no, 2);
}
#[tokio::test]
async fn fluent_transaction_building_with_file_writes() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let mount_id = crate::MountID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: vec![],
};
let nonce = agent_data
.next_transaction(&context_manager)
.message(MessageParam {
role: MessageRole::User,
content: "Create some files".into(),
})
.write_file(mount_id, "/test.txt", "Hello, world!")
.unwrap()
.write_file(mount_id, "/config.json", r#"{"setting": "value"}"#)
.unwrap()
.save()
.await
.unwrap();
assert!(!nonce.is_empty());
assert_eq!(agent_data.contexts.len(), 1);
let context = &agent_data.contexts[0];
assert_eq!(context.transactions.len(), 1);
let transaction = &context.transactions[0];
assert_eq!(transaction.writes.len(), 2);
assert_eq!(transaction.writes[0].path, "/test.txt");
assert_eq!(transaction.writes[0].data, "Hello, world!");
assert_eq!(transaction.writes[1].path, "/config.json");
let reloaded_data = context_manager.load_agent(agent_id).await.unwrap();
let reloaded_transaction = &reloaded_data.contexts[0].transactions[0];
assert_eq!(reloaded_transaction.writes.len(), 2);
}
#[tokio::test]
async fn fluent_transaction_building_multiple_messages() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: vec![],
};
let messages = vec![
MessageParam {
role: MessageRole::User,
content: "First message".into(),
},
MessageParam {
role: MessageRole::Assistant,
content: "Second message".into(),
},
];
let nonce = agent_data
.next_transaction(&context_manager)
.messages(messages.clone())
.message(MessageParam {
role: MessageRole::User,
content: "Third message".into(),
})
.save()
.await
.unwrap();
assert!(!nonce.is_empty());
let context = &agent_data.contexts[0];
let transaction = &context.transactions[0];
assert_eq!(transaction.msgs.len(), 3);
assert_eq!(transaction.msgs[0].role, MessageRole::User);
assert_eq!(transaction.msgs[1].role, MessageRole::Assistant);
assert_eq!(transaction.msgs[2].role, MessageRole::User);
assert_eq!(transaction.msgs[0].content, messages[0].content);
assert_eq!(transaction.msgs[1].content, messages[1].content);
}
#[tokio::test]
async fn fluent_transaction_building_build_without_save() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: vec![],
};
let transaction = agent_data
.next_transaction(&context_manager)
.message(MessageParam {
role: MessageRole::User,
content: "Test message".into(),
})
.build();
assert_eq!(transaction.agent_id, agent_id);
assert_eq!(transaction.context_seq_no, 1);
assert_eq!(transaction.transaction_seq_no, 1);
assert_eq!(transaction.msgs.len(), 1);
assert!(agent_data.contexts.is_empty());
let loaded_data = context_manager.load_agent(agent_id).await.unwrap();
assert!(loaded_data.contexts.is_empty());
}
#[tokio::test]
async fn fluent_transaction_building_sequence_numbers() {
let client = create_test_client().await;
let collection = client
.get_or_create_collection("test_transactions", None, None)
.await
.expect("Failed to create Chroma collection");
let context_manager =
ContextManager::new(collection).expect("Failed to create ContextManager");
let agent_id = AgentID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: vec![],
};
let tx1 = agent_data
.next_transaction(&context_manager)
.message(MessageParam {
role: MessageRole::User,
content: "Message 1".into(),
})
.build();
assert_eq!(tx1.context_seq_no, 1);
assert_eq!(tx1.transaction_seq_no, 1);
agent_data.contexts.push(AgentContext {
agent_id,
context_seq_no: 1,
transactions: vec![tx1],
});
let tx2 = agent_data
.next_transaction(&context_manager)
.message(MessageParam {
role: MessageRole::Assistant,
content: "Message 2".into(),
})
.build();
assert_eq!(tx2.context_seq_no, 1);
assert_eq!(tx2.transaction_seq_no, 2);
let tx3 = agent_data
.new_context(&context_manager)
.message(MessageParam {
role: MessageRole::User,
content: "Message 3".into(),
})
.build();
assert_eq!(tx3.context_seq_no, 2);
assert_eq!(tx3.transaction_seq_no, 1);
}
#[tokio::test]
async fn agent_data_file_reading() {
let client = create_test_client().await;
client
.heartbeat()
.await
.expect("Chroma heartbeat failed; ensure environment is configured");
let collection_name = format!("test_agent_file_reading_{}", rand::random::<u32>());
let collection = client
.get_or_create_collection(&collection_name, None, None)
.await
.unwrap();
let context_manager = ContextManager::new(collection).unwrap();
let agent_id = AgentID::generate().unwrap();
let mount_id = crate::MountID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: Vec::new(),
};
let _nonce = agent_data
.new_context(&context_manager)
.write_file(mount_id, "/file1.txt", "First file content")
.unwrap()
.write_file(mount_id, "/file2.txt", "Second file content")
.unwrap()
.write_file(mount_id, "/subdir/file3.txt", "Third file content")
.unwrap()
.save()
.await
.unwrap();
assert_eq!(
agent_data.get_file_content(mount_id, "/file1.txt").unwrap(),
Some("First file content".to_string())
);
assert_eq!(
agent_data
.get_file_content(mount_id, "/nonexistent.txt")
.unwrap(),
None
);
let files = agent_data.list_files(mount_id);
assert_eq!(files.len(), 3);
assert!(files.contains(&"/file1.txt".to_string()));
assert!(files.contains(&"/file2.txt".to_string()));
assert!(files.contains(&"/subdir/file3.txt".to_string()));
let matches = agent_data.search_file_contents(mount_id, "file").unwrap();
assert_eq!(matches.len(), 3);
assert!(matches.iter().any(|(path, _)| path == "/file1.txt"));
let matches = agent_data.search_file_contents(mount_id, "Third").unwrap();
assert_eq!(matches.len(), 1);
assert_eq!(matches[0].0, "/subdir/file3.txt");
}
#[tokio::test]
async fn transaction_builder_filesystem_methods() {
let client = create_test_client().await;
client
.heartbeat()
.await
.expect("Chroma heartbeat failed; ensure environment is configured");
let collection_name = format!("test_builder_fs_{}", rand::random::<u32>());
let collection = client
.get_or_create_collection(&collection_name, None, None)
.await
.unwrap();
let context_manager = ContextManager::new(collection).unwrap();
let agent_id = AgentID::generate().unwrap();
let mount_id = crate::MountID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: Vec::new(),
};
let _nonce = agent_data
.new_context(&context_manager)
.write_file(mount_id, "/original.txt", "Original content")
.unwrap()
.save()
.await
.unwrap();
let builder = agent_data.next_transaction(&context_manager);
assert_eq!(
builder.view_file(mount_id, "/original.txt"),
Some("Original content".to_string())
);
assert_eq!(builder.view_file(mount_id, "/nonexistent.txt"), None);
let files = builder.list_files(mount_id);
assert_eq!(files.len(), 1);
assert!(files.contains(&"/original.txt".to_string()));
let matches = builder.search_files(mount_id, "Original");
assert_eq!(matches.len(), 1);
assert_eq!(matches[0].0, "/original.txt");
let builder = builder
.str_replace_file(mount_id, "/original.txt", "Original", "Modified")
.unwrap();
assert_eq!(
builder
.get_buffered_content(mount_id, "/original.txt")
.unwrap(),
Some("Modified content".to_string())
);
let builder2 = agent_data.next_transaction(&context_manager);
let result = builder2.str_replace_file(mount_id, "/nonexistent.txt", "old", "new");
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("not found"));
}
let builder3 = agent_data.next_transaction(&context_manager);
let result = builder3.str_replace_file(mount_id, "/original.txt", "nonexistent", "new");
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("not found"));
}
let builder4 = agent_data
.next_transaction(&context_manager)
.insert_file(mount_id, "/new_file.txt", "New file content")
.unwrap();
assert_eq!(
builder4
.get_buffered_content(mount_id, "/new_file.txt")
.unwrap(),
Some("New file content".to_string())
);
}
#[tokio::test]
async fn write_buffering_latest_wins() {
let client = create_test_client().await;
client
.heartbeat()
.await
.expect("Chroma heartbeat failed; ensure environment is configured");
let collection_name = format!("test_write_buffering_{}", rand::random::<u32>());
let collection = client
.get_or_create_collection(&collection_name, None, None)
.await
.unwrap();
let context_manager = ContextManager::new(collection).unwrap();
let agent_id = AgentID::generate().unwrap();
let mount_id = crate::MountID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: Vec::new(),
};
let builder = agent_data
.new_context(&context_manager)
.write_file(mount_id, "/multi_write.txt", "First write")
.unwrap()
.write_file(mount_id, "/multi_write.txt", "Second write")
.unwrap()
.write_file(mount_id, "/multi_write.txt", "Third write")
.unwrap();
assert_eq!(
builder
.get_buffered_content(mount_id, "/multi_write.txt")
.unwrap(),
Some("Third write".to_string())
);
let summary = builder.get_write_summary();
assert_eq!(summary.len(), 3); assert!(
summary
.iter()
.all(|(_, path, _)| path == "/multi_write.txt")
);
let _nonce = builder.save().await.unwrap();
assert_eq!(
agent_data
.get_file_content(mount_id, "/multi_write.txt")
.unwrap(),
Some("Third write".to_string())
);
}
#[tokio::test]
async fn multiple_writes_same_file_appears_once() {
let client = create_test_client().await;
client
.heartbeat()
.await
.expect("Chroma heartbeat failed; ensure environment is configured");
let collection_name = format!("test_multiple_writes_{}", rand::random::<u32>());
let collection = client
.get_or_create_collection(&collection_name, None, None)
.await
.unwrap();
let context_manager = ContextManager::new(collection).unwrap();
let agent_id = AgentID::generate().unwrap();
let mount_id = crate::MountID::generate().unwrap();
let mut agent_data = AgentData {
agent_id,
contexts: Vec::new(),
};
let _nonce1 = agent_data
.new_context(&context_manager)
.write_file(mount_id, "/config.txt", "Version 1 content")
.unwrap()
.write_file(mount_id, "/other.txt", "Other file content")
.unwrap()
.save()
.await
.unwrap();
let _nonce2 = agent_data
.next_transaction(&context_manager)
.write_file(mount_id, "/config.txt", "Version 2 content")
.unwrap()
.save()
.await
.unwrap();
let _nonce3 = agent_data
.next_transaction(&context_manager)
.write_file(mount_id, "/config.txt", "Version 3 final content")
.unwrap()
.save()
.await
.unwrap();
let files = agent_data.list_files(mount_id);
assert_eq!(files.len(), 2); assert!(files.contains(&"/config.txt".to_string()));
assert!(files.contains(&"/other.txt".to_string()));
let config_count = files.iter().filter(|&f| f == "/config.txt").count();
let other_count = files.iter().filter(|&f| f == "/other.txt").count();
assert_eq!(
config_count, 1,
"config.txt should appear exactly once in file list"
);
assert_eq!(
other_count, 1,
"other.txt should appear exactly once in file list"
);
let content = agent_data
.get_file_content(mount_id, "/config.txt")
.unwrap();
assert_eq!(content, Some("Version 3 final content".to_string()));
let other_content = agent_data.get_file_content(mount_id, "/other.txt").unwrap();
assert_eq!(other_content, Some("Other file content".to_string()));
let matches = agent_data.search_file_contents(mount_id, "final").unwrap();
assert_eq!(matches.len(), 1, "Should find config.txt exactly once");
assert_eq!(matches[0].0, "/config.txt");
assert!(matches[0].1[0].contains("Version 3 final content"));
let old_matches = agent_data
.search_file_contents(mount_id, "Version 1")
.unwrap();
assert_eq!(
old_matches.len(),
0,
"Should not find old Version 1 content"
);
let old_matches2 = agent_data
.search_file_contents(mount_id, "Version 2")
.unwrap();
assert_eq!(
old_matches2.len(),
0,
"Should not find old Version 2 content"
);
let builder = agent_data
.next_transaction(&context_manager)
.write_file(mount_id, "/config.txt", "Pending version")
.unwrap();
let builder_files = builder.list_files(mount_id);
let config_count_builder = builder_files.iter().filter(|&f| f == "/config.txt").count();
assert_eq!(
config_count_builder, 1,
"TransactionBuilder should show config.txt exactly once"
);
assert_eq!(
builder
.get_buffered_content(mount_id, "/config.txt")
.unwrap(),
Some("Pending version".to_string())
);
let builder_matches = builder.search_files(mount_id, "Pending");
assert_eq!(
builder_matches.len(),
1,
"Should find file with pending write exactly once"
);
assert_eq!(builder_matches[0].0, "/config.txt");
}
}