use crate::embeddings::{BoxedEmbeddingProvider, HashEmbedding};
use crate::error::{Result, RuvectorError};
use crate::types::*;
use crate::vector_db::VectorDB;
use parking_lot::RwLock;
use redb::{Database, TableDefinition};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
const REFLEXION_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("reflexion_episodes");
const SKILLS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("skills_library");
const CAUSAL_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("causal_edges");
const LEARNING_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("learning_sessions");
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReflexionEpisode {
pub id: String,
pub task: String,
pub actions: Vec<String>,
pub observations: Vec<String>,
pub critique: String,
pub embedding: Vec<f32>,
pub timestamp: i64,
pub metadata: Option<HashMap<String, serde_json::Value>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
pub struct Skill {
pub id: String,
pub name: String,
pub description: String,
pub parameters: HashMap<String, String>,
pub examples: Vec<String>,
pub embedding: Vec<f32>,
pub usage_count: usize,
pub success_rate: f64,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
pub struct CausalEdge {
pub id: String,
pub causes: Vec<String>, pub effects: Vec<String>, pub confidence: f64,
pub context: String,
pub embedding: Vec<f32>,
pub observations: usize,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
pub struct LearningSession {
pub id: String,
pub algorithm: String, pub state_dim: usize,
pub action_dim: usize,
pub experiences: Vec<Experience>,
pub model_params: Option<Vec<u8>>, pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
pub struct Experience {
pub state: Vec<f32>,
pub action: Vec<f32>,
pub reward: f64,
pub next_state: Vec<f32>,
pub done: bool,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
pub struct Prediction {
pub action: Vec<f32>,
pub confidence_lower: f64,
pub confidence_upper: f64,
pub mean_confidence: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UtilitySearchResult {
pub result: SearchResult,
pub utility_score: f64,
pub similarity_score: f64,
pub causal_uplift: f64,
pub latency_penalty: f64,
}
pub struct AgenticDB {
vector_db: Arc<VectorDB>,
db: Arc<Database>,
_dimensions: usize,
embedding_provider: BoxedEmbeddingProvider,
}
impl AgenticDB {
pub fn new(options: DbOptions) -> Result<Self> {
let embedding_provider = Arc::new(HashEmbedding::new(options.dimensions));
Self::with_embedding_provider(options, embedding_provider)
}
pub fn with_embedding_provider(
options: DbOptions,
embedding_provider: BoxedEmbeddingProvider,
) -> Result<Self> {
if options.dimensions != embedding_provider.dimensions() {
return Err(RuvectorError::InvalidDimension(format!(
"Options dimensions ({}) do not match embedding provider dimensions ({})",
options.dimensions,
embedding_provider.dimensions()
)));
}
let vector_db = Arc::new(VectorDB::new(options.clone())?);
let agentic_path = format!("{}.agentic", options.storage_path);
let db = Arc::new(Database::create(&agentic_path)?);
let write_txn = db.begin_write()?;
{
let _ = write_txn.open_table(REFLEXION_TABLE)?;
let _ = write_txn.open_table(SKILLS_TABLE)?;
let _ = write_txn.open_table(CAUSAL_TABLE)?;
let _ = write_txn.open_table(LEARNING_TABLE)?;
}
write_txn.commit()?;
Ok(Self {
vector_db,
db,
_dimensions: options.dimensions,
embedding_provider,
})
}
pub fn with_dimensions(dimensions: usize) -> Result<Self> {
let options = DbOptions {
dimensions,
..DbOptions::default()
};
Self::new(options)
}
pub fn embedding_provider_name(&self) -> &str {
self.embedding_provider.name()
}
pub fn insert(&self, entry: VectorEntry) -> Result<VectorId> {
self.vector_db.insert(entry)
}
pub fn insert_batch(&self, entries: Vec<VectorEntry>) -> Result<Vec<VectorId>> {
self.vector_db.insert_batch(entries)
}
pub fn search(&self, query: SearchQuery) -> Result<Vec<SearchResult>> {
self.vector_db.search(query)
}
pub fn delete(&self, id: &str) -> Result<bool> {
self.vector_db.delete(id)
}
pub fn get(&self, id: &str) -> Result<Option<VectorEntry>> {
self.vector_db.get(id)
}
pub fn store_episode(
&self,
task: String,
actions: Vec<String>,
observations: Vec<String>,
critique: String,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let embedding = self.generate_text_embedding(&critique)?;
let episode = ReflexionEpisode {
id: id.clone(),
task,
actions,
observations,
critique,
embedding: embedding.clone(),
timestamp: chrono::Utc::now().timestamp(),
metadata: None,
};
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(REFLEXION_TABLE)?;
let json = serde_json::to_vec(&episode)
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
table.insert(id.as_str(), json.as_slice())?;
}
write_txn.commit()?;
self.vector_db.insert(VectorEntry {
id: Some(format!("reflexion_{}", id)),
vector: embedding,
metadata: Some({
let mut meta = HashMap::new();
meta.insert("type".to_string(), serde_json::json!("reflexion"));
meta.insert("episode_id".to_string(), serde_json::json!(id.clone()));
meta
}),
})?;
Ok(id)
}
pub fn retrieve_similar_episodes(
&self,
query: &str,
k: usize,
) -> Result<Vec<ReflexionEpisode>> {
let query_embedding = self.generate_text_embedding(query)?;
let results = self.vector_db.search(SearchQuery {
vector: query_embedding,
k,
filter: Some({
let mut filter = HashMap::new();
filter.insert("type".to_string(), serde_json::json!("reflexion"));
filter
}),
ef_search: None,
})?;
let mut episodes = Vec::new();
let read_txn = self.db.begin_read()?;
let table = read_txn.open_table(REFLEXION_TABLE)?;
for result in results {
if let Some(metadata) = result.metadata {
if let Some(episode_id) = metadata.get("episode_id") {
let id = episode_id.as_str().unwrap();
if let Some(data) = table.get(id)? {
let episode: ReflexionEpisode = serde_json::from_slice(data.value())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
episodes.push(episode);
}
}
}
}
Ok(episodes)
}
pub fn create_skill(
&self,
name: String,
description: String,
parameters: HashMap<String, String>,
examples: Vec<String>,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let embedding = self.generate_text_embedding(&description)?;
let skill = Skill {
id: id.clone(),
name,
description,
parameters,
examples,
embedding: embedding.clone(),
usage_count: 0,
success_rate: 0.0,
created_at: chrono::Utc::now().timestamp(),
updated_at: chrono::Utc::now().timestamp(),
};
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(SKILLS_TABLE)?;
let data = bincode::encode_to_vec(&skill, bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
table.insert(id.as_str(), data.as_slice())?;
}
write_txn.commit()?;
self.vector_db.insert(VectorEntry {
id: Some(format!("skill_{}", id)),
vector: embedding,
metadata: Some({
let mut meta = HashMap::new();
meta.insert("type".to_string(), serde_json::json!("skill"));
meta.insert("skill_id".to_string(), serde_json::json!(id.clone()));
meta
}),
})?;
Ok(id)
}
pub fn search_skills(&self, query_description: &str, k: usize) -> Result<Vec<Skill>> {
let query_embedding = self.generate_text_embedding(query_description)?;
let results = self.vector_db.search(SearchQuery {
vector: query_embedding,
k,
filter: Some({
let mut filter = HashMap::new();
filter.insert("type".to_string(), serde_json::json!("skill"));
filter
}),
ef_search: None,
})?;
let mut skills = Vec::new();
let read_txn = self.db.begin_read()?;
let table = read_txn.open_table(SKILLS_TABLE)?;
for result in results {
if let Some(metadata) = result.metadata {
if let Some(skill_id) = metadata.get("skill_id") {
let id = skill_id.as_str().unwrap();
if let Some(data) = table.get(id)? {
let (skill, _): (Skill, usize) =
bincode::decode_from_slice(data.value(), bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
skills.push(skill);
}
}
}
}
Ok(skills)
}
pub fn auto_consolidate(
&self,
action_sequences: Vec<Vec<String>>,
success_threshold: usize,
) -> Result<Vec<String>> {
let mut skill_ids = Vec::new();
for sequence in action_sequences {
if sequence.len() >= success_threshold {
let description = format!("Skill: {}", sequence.join(" -> "));
let skill_id = self.create_skill(
format!("Auto-Skill-{}", uuid::Uuid::new_v4()),
description,
HashMap::new(),
sequence.clone(),
)?;
skill_ids.push(skill_id);
}
}
Ok(skill_ids)
}
pub fn add_causal_edge(
&self,
causes: Vec<String>,
effects: Vec<String>,
confidence: f64,
context: String,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let embedding = self.generate_text_embedding(&context)?;
let edge = CausalEdge {
id: id.clone(),
causes,
effects,
confidence,
context,
embedding: embedding.clone(),
observations: 1,
timestamp: chrono::Utc::now().timestamp(),
};
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(CAUSAL_TABLE)?;
let data = bincode::encode_to_vec(&edge, bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
table.insert(id.as_str(), data.as_slice())?;
}
write_txn.commit()?;
self.vector_db.insert(VectorEntry {
id: Some(format!("causal_{}", id)),
vector: embedding,
metadata: Some({
let mut meta = HashMap::new();
meta.insert("type".to_string(), serde_json::json!("causal"));
meta.insert("causal_id".to_string(), serde_json::json!(id.clone()));
meta.insert("confidence".to_string(), serde_json::json!(confidence));
meta
}),
})?;
Ok(id)
}
pub fn query_with_utility(
&self,
query: &str,
k: usize,
alpha: f64,
beta: f64,
gamma: f64,
) -> Result<Vec<UtilitySearchResult>> {
let start_time = std::time::Instant::now();
let query_embedding = self.generate_text_embedding(query)?;
let results = self.vector_db.search(SearchQuery {
vector: query_embedding,
k: k * 2, filter: Some({
let mut filter = HashMap::new();
filter.insert("type".to_string(), serde_json::json!("causal"));
filter
}),
ef_search: None,
})?;
let mut utility_results = Vec::new();
for result in results {
let similarity_score = 1.0 / (1.0 + result.score as f64);
let causal_uplift = if let Some(ref metadata) = result.metadata {
metadata
.get("confidence")
.and_then(|v| v.as_f64())
.unwrap_or(0.0)
} else {
0.0
};
let latency = start_time.elapsed().as_secs_f64();
let latency_penalty = latency * gamma;
let utility_score = alpha * similarity_score + beta * causal_uplift - latency_penalty;
utility_results.push(UtilitySearchResult {
result,
utility_score,
similarity_score,
causal_uplift,
latency_penalty,
});
}
utility_results.sort_by(|a, b| b.utility_score.partial_cmp(&a.utility_score).unwrap());
utility_results.truncate(k);
Ok(utility_results)
}
pub fn start_session(
&self,
algorithm: String,
state_dim: usize,
action_dim: usize,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let session = LearningSession {
id: id.clone(),
algorithm,
state_dim,
action_dim,
experiences: Vec::new(),
model_params: None,
created_at: chrono::Utc::now().timestamp(),
updated_at: chrono::Utc::now().timestamp(),
};
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(LEARNING_TABLE)?;
let data = bincode::encode_to_vec(&session, bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
table.insert(id.as_str(), data.as_slice())?;
}
write_txn.commit()?;
Ok(id)
}
pub fn add_experience(
&self,
session_id: &str,
state: Vec<f32>,
action: Vec<f32>,
reward: f64,
next_state: Vec<f32>,
done: bool,
) -> Result<()> {
let read_txn = self.db.begin_read()?;
let table = read_txn.open_table(LEARNING_TABLE)?;
let data = table
.get(session_id)?
.ok_or_else(|| RuvectorError::VectorNotFound(session_id.to_string()))?;
let (mut session, _): (LearningSession, usize) =
bincode::decode_from_slice(data.value(), bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
drop(table);
drop(read_txn);
session.experiences.push(Experience {
state,
action,
reward,
next_state,
done,
timestamp: chrono::Utc::now().timestamp(),
});
session.updated_at = chrono::Utc::now().timestamp();
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(LEARNING_TABLE)?;
let data = bincode::encode_to_vec(&session, bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
table.insert(session_id, data.as_slice())?;
}
write_txn.commit()?;
Ok(())
}
pub fn predict_with_confidence(&self, session_id: &str, state: Vec<f32>) -> Result<Prediction> {
let read_txn = self.db.begin_read()?;
let table = read_txn.open_table(LEARNING_TABLE)?;
let data = table
.get(session_id)?
.ok_or_else(|| RuvectorError::VectorNotFound(session_id.to_string()))?;
let (session, _): (LearningSession, usize) =
bincode::decode_from_slice(data.value(), bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
let mut similar_actions = Vec::new();
let mut rewards = Vec::new();
for exp in &session.experiences {
let distance = euclidean_distance(&state, &exp.state);
if distance < 1.0 {
similar_actions.push(exp.action.clone());
rewards.push(exp.reward);
}
}
if similar_actions.is_empty() {
return Ok(Prediction {
action: vec![0.0; session.action_dim],
confidence_lower: 0.0,
confidence_upper: 0.0,
mean_confidence: 0.0,
});
}
let total_reward: f64 = rewards.iter().sum();
let mut action = vec![0.0; session.action_dim];
for (act, reward) in similar_actions.iter().zip(rewards.iter()) {
let weight = reward / total_reward;
for (i, val) in act.iter().enumerate() {
action[i] += val * weight as f32;
}
}
let mean_reward = total_reward / rewards.len() as f64;
let std_dev = calculate_std_dev(&rewards, mean_reward);
Ok(Prediction {
action,
confidence_lower: mean_reward - 1.96 * std_dev,
confidence_upper: mean_reward + 1.96 * std_dev,
mean_confidence: mean_reward,
})
}
pub fn get_session(&self, session_id: &str) -> Result<Option<LearningSession>> {
let read_txn = self.db.begin_read()?;
let table = read_txn.open_table(LEARNING_TABLE)?;
if let Some(data) = table.get(session_id)? {
let (session, _): (LearningSession, usize) =
bincode::decode_from_slice(data.value(), bincode::config::standard())
.map_err(|e| RuvectorError::SerializationError(e.to_string()))?;
Ok(Some(session))
} else {
Ok(None)
}
}
fn generate_text_embedding(&self, text: &str) -> Result<Vec<f32>> {
self.embedding_provider.embed(text)
}
}
fn euclidean_distance(a: &[f32], b: &[f32]) -> f32 {
a.iter()
.zip(b.iter())
.map(|(x, y)| (x - y).powi(2))
.sum::<f32>()
.sqrt()
}
fn calculate_std_dev(values: &[f64], mean: f64) -> f64 {
let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
variance.sqrt()
}
pub struct PolicyMemoryStore<'a> {
db: &'a AgenticDB,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolicyAction {
pub action: String,
pub reward: f64,
pub q_value: f64,
pub state_embedding: Vec<f32>,
pub timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolicyEntry {
pub id: String,
pub state_id: String,
pub action: PolicyAction,
pub metadata: Option<HashMap<String, serde_json::Value>>,
}
impl<'a> PolicyMemoryStore<'a> {
pub fn new(db: &'a AgenticDB) -> Self {
Self { db }
}
pub fn store_policy(
&self,
state_id: &str,
state_embedding: Vec<f32>,
action: &str,
reward: f64,
q_value: f64,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let timestamp = chrono::Utc::now().timestamp();
let _entry = PolicyEntry {
id: id.clone(),
state_id: state_id.to_string(),
action: PolicyAction {
action: action.to_string(),
reward,
q_value,
state_embedding: state_embedding.clone(),
timestamp,
},
metadata: None,
};
self.db.vector_db.insert(VectorEntry {
id: Some(format!("policy_{}", id)),
vector: state_embedding,
metadata: Some({
let mut meta = HashMap::new();
meta.insert("type".to_string(), serde_json::json!("policy"));
meta.insert("policy_id".to_string(), serde_json::json!(id.clone()));
meta.insert("state_id".to_string(), serde_json::json!(state_id));
meta.insert("action".to_string(), serde_json::json!(action));
meta.insert("reward".to_string(), serde_json::json!(reward));
meta.insert("q_value".to_string(), serde_json::json!(q_value));
meta
}),
})?;
Ok(id)
}
pub fn retrieve_similar_states(
&self,
state_embedding: &[f32],
k: usize,
) -> Result<Vec<PolicyEntry>> {
let results = self.db.vector_db.search(SearchQuery {
vector: state_embedding.to_vec(),
k,
filter: Some({
let mut filter = HashMap::new();
filter.insert("type".to_string(), serde_json::json!("policy"));
filter
}),
ef_search: None,
})?;
let mut entries = Vec::new();
for result in results {
if let Some(metadata) = result.metadata {
let policy_id = metadata
.get("policy_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let state_id = metadata
.get("state_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let action = metadata
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("");
let reward = metadata
.get("reward")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let q_value = metadata
.get("q_value")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
entries.push(PolicyEntry {
id: policy_id.to_string(),
state_id: state_id.to_string(),
action: PolicyAction {
action: action.to_string(),
reward,
q_value,
state_embedding: result.vector.unwrap_or_default(),
timestamp: 0,
},
metadata: None,
});
}
}
Ok(entries)
}
pub fn get_best_action(&self, state_embedding: &[f32], k: usize) -> Result<Option<String>> {
let similar = self.retrieve_similar_states(state_embedding, k)?;
similar
.into_iter()
.max_by(|a, b| a.action.q_value.partial_cmp(&b.action.q_value).unwrap())
.map(|entry| Ok(entry.action.action))
.transpose()
}
pub fn update_q_value(&self, policy_id: &str, _new_q_value: f64) -> Result<()> {
let _ = self.db.vector_db.delete(&format!("policy_{}", policy_id));
Ok(())
}
}
pub struct SessionStateIndex<'a> {
db: &'a AgenticDB,
session_id: String,
ttl_seconds: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionTurn {
pub id: String,
pub session_id: String,
pub turn_number: usize,
pub role: String,
pub content: String,
pub embedding: Vec<f32>,
pub timestamp: i64,
pub expires_at: i64,
}
impl<'a> SessionStateIndex<'a> {
pub fn new(db: &'a AgenticDB, session_id: &str, ttl_seconds: i64) -> Self {
Self {
db,
session_id: session_id.to_string(),
ttl_seconds,
}
}
pub fn add_turn(&self, turn_number: usize, role: &str, content: &str) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let timestamp = chrono::Utc::now().timestamp();
let expires_at = timestamp + self.ttl_seconds;
let embedding = self.db.generate_text_embedding(content)?;
self.db.vector_db.insert(VectorEntry {
id: Some(format!("session_{}_{}", self.session_id, id)),
vector: embedding,
metadata: Some({
let mut meta = HashMap::new();
meta.insert("type".to_string(), serde_json::json!("session_turn"));
meta.insert(
"session_id".to_string(),
serde_json::json!(self.session_id.clone()),
);
meta.insert("turn_id".to_string(), serde_json::json!(id.clone()));
meta.insert("turn_number".to_string(), serde_json::json!(turn_number));
meta.insert("role".to_string(), serde_json::json!(role));
meta.insert("content".to_string(), serde_json::json!(content));
meta.insert("timestamp".to_string(), serde_json::json!(timestamp));
meta.insert("expires_at".to_string(), serde_json::json!(expires_at));
meta
}),
})?;
Ok(id)
}
pub fn find_relevant_turns(&self, query: &str, k: usize) -> Result<Vec<SessionTurn>> {
let query_embedding = self.db.generate_text_embedding(query)?;
let current_time = chrono::Utc::now().timestamp();
let results = self.db.vector_db.search(SearchQuery {
vector: query_embedding,
k: k * 2, filter: Some({
let mut filter = HashMap::new();
filter.insert("type".to_string(), serde_json::json!("session_turn"));
filter.insert(
"session_id".to_string(),
serde_json::json!(self.session_id.clone()),
);
filter
}),
ef_search: None,
})?;
let mut turns = Vec::new();
for result in results {
if let Some(metadata) = result.metadata {
let expires_at = metadata
.get("expires_at")
.and_then(|v| v.as_i64())
.unwrap_or(0);
if expires_at < current_time {
continue;
}
turns.push(SessionTurn {
id: metadata
.get("turn_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
session_id: self.session_id.clone(),
turn_number: metadata
.get("turn_number")
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize,
role: metadata
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
content: metadata
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
embedding: result.vector.unwrap_or_default(),
timestamp: metadata
.get("timestamp")
.and_then(|v| v.as_i64())
.unwrap_or(0),
expires_at,
});
if turns.len() >= k {
break;
}
}
}
Ok(turns)
}
pub fn get_session_context(&self) -> Result<Vec<SessionTurn>> {
let mut turns = self.find_relevant_turns("", 1000)?;
turns.sort_by_key(|t| t.turn_number);
Ok(turns)
}
pub fn cleanup_expired(&self) -> Result<usize> {
let current_time = chrono::Utc::now().timestamp();
let all_turns = self.find_relevant_turns("", 10000)?;
let mut deleted = 0;
for turn in all_turns {
if turn.expires_at < current_time {
let _ = self
.db
.vector_db
.delete(&format!("session_{}_{}", self.session_id, turn.id));
deleted += 1;
}
}
Ok(deleted)
}
}
pub struct WitnessLog<'a> {
db: &'a AgenticDB,
last_hash: RwLock<Option<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WitnessEntry {
pub id: String,
pub prev_hash: Option<String>,
pub hash: String,
pub agent_id: String,
pub action_type: String,
pub details: String,
pub embedding: Vec<f32>,
pub timestamp: i64,
pub metadata: Option<HashMap<String, serde_json::Value>>,
}
impl<'a> WitnessLog<'a> {
pub fn new(db: &'a AgenticDB) -> Self {
Self {
db,
last_hash: RwLock::new(None),
}
}
fn compute_hash(
prev_hash: &Option<String>,
agent_id: &str,
action_type: &str,
details: &str,
timestamp: i64,
) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
if let Some(prev) = prev_hash {
prev.hash(&mut hasher);
}
agent_id.hash(&mut hasher);
action_type.hash(&mut hasher);
details.hash(&mut hasher);
timestamp.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
pub fn append(&self, agent_id: &str, action_type: &str, details: &str) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let timestamp = chrono::Utc::now().timestamp();
let prev_hash = self.last_hash.read().clone();
let hash = Self::compute_hash(&prev_hash, agent_id, action_type, details, timestamp);
let embedding = self
.db
.generate_text_embedding(&format!("{} {} {}", agent_id, action_type, details))?;
self.db.vector_db.insert(VectorEntry {
id: Some(format!("witness_{}", id)),
vector: embedding.clone(),
metadata: Some({
let mut meta = HashMap::new();
meta.insert("type".to_string(), serde_json::json!("witness"));
meta.insert("witness_id".to_string(), serde_json::json!(id.clone()));
meta.insert("agent_id".to_string(), serde_json::json!(agent_id));
meta.insert("action_type".to_string(), serde_json::json!(action_type));
meta.insert("details".to_string(), serde_json::json!(details));
meta.insert("timestamp".to_string(), serde_json::json!(timestamp));
meta.insert("hash".to_string(), serde_json::json!(hash.clone()));
if let Some(ref prev) = prev_hash {
meta.insert("prev_hash".to_string(), serde_json::json!(prev));
}
meta
}),
})?;
*self.last_hash.write() = Some(hash.clone());
Ok(id)
}
pub fn search(&self, query: &str, k: usize) -> Result<Vec<WitnessEntry>> {
let query_embedding = self.db.generate_text_embedding(query)?;
let results = self.db.vector_db.search(SearchQuery {
vector: query_embedding,
k,
filter: Some({
let mut filter = HashMap::new();
filter.insert("type".to_string(), serde_json::json!("witness"));
filter
}),
ef_search: None,
})?;
let mut entries = Vec::new();
for result in results {
if let Some(metadata) = result.metadata {
entries.push(WitnessEntry {
id: metadata
.get("witness_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
prev_hash: metadata
.get("prev_hash")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
hash: metadata
.get("hash")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
agent_id: metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
action_type: metadata
.get("action_type")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
details: metadata
.get("details")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
embedding: result.vector.unwrap_or_default(),
timestamp: metadata
.get("timestamp")
.and_then(|v| v.as_i64())
.unwrap_or(0),
metadata: None,
});
}
}
Ok(entries)
}
pub fn get_by_agent(&self, agent_id: &str, k: usize) -> Result<Vec<WitnessEntry>> {
self.search(agent_id, k)
}
pub fn verify_chain(&self) -> Result<bool> {
let entries = self.search("", 10000)?;
let mut sorted_entries = entries;
sorted_entries.sort_by_key(|e| e.timestamp);
for i in 1..sorted_entries.len() {
let prev = &sorted_entries[i - 1];
let curr = &sorted_entries[i];
if let Some(ref prev_hash) = curr.prev_hash {
if prev_hash != &prev.hash {
return Ok(false);
}
}
}
Ok(true)
}
}
impl AgenticDB {
pub fn policy_memory(&self) -> PolicyMemoryStore<'_> {
PolicyMemoryStore::new(self)
}
pub fn session_index(&self, session_id: &str, ttl_seconds: i64) -> SessionStateIndex<'_> {
SessionStateIndex::new(self, session_id, ttl_seconds)
}
pub fn witness_log(&self) -> WitnessLog<'_> {
WitnessLog::new(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_db() -> Result<AgenticDB> {
let dir = tempdir().unwrap();
let mut options = DbOptions::default();
options.storage_path = dir.path().join("test.db").to_string_lossy().to_string();
options.dimensions = 128;
AgenticDB::new(options)
}
#[test]
fn test_reflexion_episode() -> Result<()> {
let db = create_test_db()?;
let id = db.store_episode(
"Solve math problem".to_string(),
vec!["read problem".to_string(), "calculate".to_string()],
vec!["got 42".to_string()],
"Should have shown work".to_string(),
)?;
let episodes = db.retrieve_similar_episodes("math problem solving", 5)?;
assert!(!episodes.is_empty());
assert_eq!(episodes[0].id, id);
Ok(())
}
#[test]
fn test_skill_library() -> Result<()> {
let db = create_test_db()?;
let mut params = HashMap::new();
params.insert("input".to_string(), "string".to_string());
let skill_id = db.create_skill(
"Parse JSON".to_string(),
"Parse JSON from string".to_string(),
params,
vec!["json.parse()".to_string()],
)?;
let skills = db.search_skills("parse json data", 5)?;
assert!(!skills.is_empty());
Ok(())
}
#[test]
fn test_causal_edge() -> Result<()> {
let db = create_test_db()?;
let edge_id = db.add_causal_edge(
vec!["rain".to_string()],
vec!["wet ground".to_string()],
0.95,
"Weather observation".to_string(),
)?;
let results = db.query_with_utility("weather patterns", 5, 0.7, 0.2, 0.1)?;
assert!(!results.is_empty());
Ok(())
}
#[test]
fn test_learning_session() -> Result<()> {
let db = create_test_db()?;
let session_id = db.start_session("Q-Learning".to_string(), 4, 2)?;
db.add_experience(
&session_id,
vec![1.0, 0.0, 0.0, 0.0],
vec![1.0, 0.0],
1.0,
vec![0.0, 1.0, 0.0, 0.0],
false,
)?;
let prediction = db.predict_with_confidence(&session_id, vec![1.0, 0.0, 0.0, 0.0])?;
assert_eq!(prediction.action.len(), 2);
Ok(())
}
#[test]
fn test_auto_consolidate() -> Result<()> {
let db = create_test_db()?;
let sequences = vec![
vec![
"step1".to_string(),
"step2".to_string(),
"step3".to_string(),
],
vec![
"action1".to_string(),
"action2".to_string(),
"action3".to_string(),
],
];
let skill_ids = db.auto_consolidate(sequences, 3)?;
assert_eq!(skill_ids.len(), 2);
Ok(())
}
}