pub mod credentials;
pub mod fault_injection;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use async_trait::async_trait;
use rust_decimal::Decimal;
use tokio::sync::{Mutex as AsyncMutex, mpsc};
use crate::agent::AgentDeps;
use crate::channels::{
Channel, ChannelManager, IncomingMessage, MessageStream, OutgoingResponse, StatusUpdate,
};
use crate::db::Database;
use crate::error::{ChannelError, LlmError};
use crate::llm::{
CompletionRequest, CompletionResponse, FinishReason, LlmProvider, ToolCompletionRequest,
ToolCompletionResponse,
};
use crate::tools::ToolRegistry;
#[cfg(feature = "libsql")]
pub async fn test_db() -> (Arc<dyn Database>, tempfile::TempDir) {
use crate::db::libsql::LibSqlBackend;
let dir = tempfile::tempdir().expect("failed to create temp dir");
let path = dir.path().join("test.db");
let backend = LibSqlBackend::new_local(&path)
.await
.expect("failed to create test LibSqlBackend");
backend
.run_migrations()
.await
.expect("failed to run migrations");
(Arc::new(backend) as Arc<dyn Database>, dir)
}
#[derive(Clone, Copy, Debug)]
pub enum StubErrorKind {
Transient,
NonTransient,
}
pub struct StubLlm {
model_name: String,
response: String,
call_count: AtomicU32,
should_fail: AtomicBool,
error_kind: StubErrorKind,
fault_injector: Option<Arc<fault_injection::FaultInjector>>,
}
impl StubLlm {
pub fn new(response: impl Into<String>) -> Self {
Self {
model_name: "stub-model".to_string(),
response: response.into(),
call_count: AtomicU32::new(0),
should_fail: AtomicBool::new(false),
error_kind: StubErrorKind::Transient,
fault_injector: None,
}
}
pub fn failing(name: impl Into<String>) -> Self {
Self {
model_name: name.into(),
response: String::new(),
call_count: AtomicU32::new(0),
should_fail: AtomicBool::new(true),
error_kind: StubErrorKind::Transient,
fault_injector: None,
}
}
pub fn failing_non_transient(name: impl Into<String>) -> Self {
Self {
model_name: name.into(),
response: String::new(),
call_count: AtomicU32::new(0),
should_fail: AtomicBool::new(true),
error_kind: StubErrorKind::NonTransient,
fault_injector: None,
}
}
pub fn with_model_name(mut self, name: impl Into<String>) -> Self {
self.model_name = name.into();
self
}
pub fn calls(&self) -> u32 {
self.call_count.load(Ordering::Relaxed)
}
pub fn with_fault_injector(mut self, injector: Arc<fault_injection::FaultInjector>) -> Self {
self.fault_injector = Some(injector);
self
}
pub fn set_failing(&self, fail: bool) {
self.should_fail.store(fail, Ordering::Relaxed);
}
async fn check_faults(&self) -> Option<LlmError> {
if let Some(ref injector) = self.fault_injector {
match injector.next_action() {
fault_injection::FaultAction::Fail(fault) => {
return Some(fault.to_llm_error(&self.model_name));
}
fault_injection::FaultAction::Delay(duration) => {
tokio::time::sleep(duration).await;
}
fault_injection::FaultAction::Succeed => {}
}
} else if self.should_fail.load(Ordering::Relaxed) {
return Some(self.make_error());
}
None
}
fn make_error(&self) -> LlmError {
match self.error_kind {
StubErrorKind::Transient => LlmError::RequestFailed {
provider: self.model_name.clone(),
reason: "server error".to_string(),
},
StubErrorKind::NonTransient => LlmError::ContextLengthExceeded {
used: 100_000,
limit: 50_000,
},
}
}
}
impl Default for StubLlm {
fn default() -> Self {
Self::new("OK")
}
}
#[async_trait]
impl LlmProvider for StubLlm {
fn model_name(&self) -> &str {
&self.model_name
}
fn cost_per_token(&self) -> (Decimal, Decimal) {
(Decimal::ZERO, Decimal::ZERO)
}
async fn complete(&self, _request: CompletionRequest) -> Result<CompletionResponse, LlmError> {
self.call_count.fetch_add(1, Ordering::Relaxed);
if let Some(err) = self.check_faults().await {
return Err(err);
}
Ok(CompletionResponse {
content: self.response.clone(),
input_tokens: 10,
output_tokens: 5,
finish_reason: FinishReason::Stop,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
})
}
async fn complete_with_tools(
&self,
_request: ToolCompletionRequest,
) -> Result<ToolCompletionResponse, LlmError> {
self.call_count.fetch_add(1, Ordering::Relaxed);
if let Some(err) = self.check_faults().await {
return Err(err);
}
Ok(ToolCompletionResponse {
content: Some(self.response.clone()),
tool_calls: Vec::new(),
input_tokens: 10,
output_tokens: 5,
finish_reason: FinishReason::Stop,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
})
}
}
pub struct StubChannel {
name: String,
rx: tokio::sync::Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
responses: Arc<Mutex<Vec<(IncomingMessage, OutgoingResponse)>>>,
statuses: Arc<Mutex<Vec<StatusUpdate>>>,
healthy: AtomicBool,
}
impl StubChannel {
pub fn new(name: impl Into<String>) -> (Self, mpsc::Sender<IncomingMessage>) {
let (tx, rx) = mpsc::channel(64);
let channel = Self {
name: name.into(),
rx: tokio::sync::Mutex::new(Some(rx)),
responses: Arc::new(Mutex::new(Vec::new())),
statuses: Arc::new(Mutex::new(Vec::new())),
healthy: AtomicBool::new(true),
};
(channel, tx)
}
pub fn captured_responses(&self) -> Vec<(IncomingMessage, OutgoingResponse)> {
self.responses.lock().expect("poisoned").clone()
}
pub fn captured_responses_handle(
&self,
) -> Arc<Mutex<Vec<(IncomingMessage, OutgoingResponse)>>> {
Arc::clone(&self.responses)
}
pub fn captured_statuses(&self) -> Vec<StatusUpdate> {
self.statuses.lock().expect("poisoned").clone()
}
pub fn captured_statuses_handle(&self) -> Arc<Mutex<Vec<StatusUpdate>>> {
Arc::clone(&self.statuses)
}
pub fn set_healthy(&self, healthy: bool) {
self.healthy.store(healthy, Ordering::Relaxed);
}
}
#[async_trait]
impl Channel for StubChannel {
fn name(&self) -> &str {
&self.name
}
async fn start(&self) -> Result<MessageStream, ChannelError> {
let rx = self
.rx
.lock()
.await
.take()
.ok_or_else(|| ChannelError::StartupFailed {
name: self.name.clone(),
reason: "start() already called".to_string(),
})?;
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(stream))
}
async fn respond(
&self,
msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
self.responses
.lock()
.expect("poisoned")
.push((msg.clone(), response));
Ok(())
}
async fn send_status(
&self,
status: StatusUpdate,
_metadata: &serde_json::Value,
) -> Result<(), ChannelError> {
self.statuses.lock().expect("poisoned").push(status);
Ok(())
}
async fn health_check(&self) -> Result<(), ChannelError> {
if self.healthy.load(Ordering::Relaxed) {
Ok(())
} else {
Err(ChannelError::HealthCheckFailed {
name: self.name.clone(),
})
}
}
}
pub type BroadcastCapture = Arc<AsyncMutex<Vec<(String, OutgoingResponse)>>>;
pub struct RecordingBroadcastChannel {
name: &'static str,
captures: BroadcastCapture,
}
impl RecordingBroadcastChannel {
pub fn new(name: &'static str) -> (Self, BroadcastCapture) {
let captures = Arc::new(AsyncMutex::new(Vec::new()));
(
Self {
name,
captures: Arc::clone(&captures),
},
captures,
)
}
}
#[async_trait]
impl Channel for RecordingBroadcastChannel {
fn name(&self) -> &str {
self.name
}
async fn start(&self) -> Result<MessageStream, ChannelError> {
let (_tx, rx) = mpsc::channel::<IncomingMessage>(1);
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
async fn respond(
&self,
_msg: &IncomingMessage,
_response: OutgoingResponse,
) -> Result<(), ChannelError> {
Ok(())
}
async fn send_status(
&self,
_status: StatusUpdate,
_metadata: &serde_json::Value,
) -> Result<(), ChannelError> {
Ok(())
}
async fn broadcast(
&self,
user_id: &str,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
self.captures
.lock()
.await
.push((user_id.to_string(), response));
Ok(())
}
async fn health_check(&self) -> Result<(), ChannelError> {
Ok(())
}
}
pub struct TestHarness {
pub deps: AgentDeps,
pub db: Arc<dyn Database>,
pub channel: Option<(mpsc::Sender<IncomingMessage>, ChannelManager)>,
#[cfg(feature = "libsql")]
_temp_dir: tempfile::TempDir,
}
pub struct TestHarnessBuilder {
db: Option<Arc<dyn Database>>,
llm: Option<Arc<dyn LlmProvider>>,
tools: Option<Arc<ToolRegistry>>,
stub_channel: bool,
}
impl TestHarnessBuilder {
pub fn new() -> Self {
Self {
db: None,
llm: None,
tools: None,
stub_channel: false,
}
}
pub fn with_db(mut self, db: Arc<dyn Database>) -> Self {
self.db = Some(db);
self
}
pub fn with_llm(mut self, llm: Arc<dyn LlmProvider>) -> Self {
self.llm = Some(llm);
self
}
pub fn with_tools(mut self, tools: Arc<ToolRegistry>) -> Self {
self.tools = Some(tools);
self
}
pub fn with_stub_channel(mut self) -> Self {
self.stub_channel = true;
self
}
#[cfg(feature = "libsql")]
pub async fn build(self) -> TestHarness {
use crate::agent::cost_guard::{CostGuard, CostGuardConfig};
use crate::config::{SafetyConfig, SkillsConfig};
use crate::hooks::HookRegistry;
use crate::safety::SafetyLayer;
let (db, temp_dir) = if let Some(db) = self.db {
let dir = tempfile::tempdir().expect("failed to create temp dir");
(db, dir)
} else {
test_db().await
};
let llm: Arc<dyn LlmProvider> = self.llm.unwrap_or_else(|| Arc::new(StubLlm::default()));
let tools = self.tools.unwrap_or_else(|| {
let t = Arc::new(ToolRegistry::new());
t.register_builtin_tools();
t
});
let safety = Arc::new(SafetyLayer::new(&SafetyConfig {
max_output_length: 100_000,
injection_check_enabled: false,
}));
let hooks = Arc::new(HookRegistry::new());
let cost_guard = Arc::new(CostGuard::new(CostGuardConfig {
max_cost_per_day_cents: None,
max_actions_per_hour: None,
}));
let channel = if self.stub_channel {
let (stub, sender) = StubChannel::new("stub");
let manager = ChannelManager::new();
manager.add(Box::new(stub)).await;
Some((sender, manager))
} else {
None
};
let deps = AgentDeps {
owner_id: "default".to_string(),
store: Some(Arc::clone(&db)),
llm,
cheap_llm: None,
safety,
tools,
workspace: None,
extension_manager: None,
skill_registry: None,
skill_catalog: None,
skills_config: SkillsConfig::default(),
hooks,
cost_guard,
sse_tx: None,
http_interceptor: None,
transcription: None,
document_extraction: None,
sandbox_readiness: crate::agent::routine_engine::SandboxReadiness::DisabledByConfig,
builder: None,
llm_backend: "nearai".to_string(),
};
TestHarness {
deps,
db,
channel,
_temp_dir: temp_dir,
}
}
}
impl Default for TestHarnessBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_harness_builds_with_defaults() {
let harness = TestHarnessBuilder::new().build().await;
assert!(harness.deps.store.is_some());
assert_eq!(harness.deps.llm.model_name(), "stub-model");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_harness_custom_llm() {
let custom_llm = Arc::new(StubLlm::new("custom response").with_model_name("my-model"));
let harness = TestHarnessBuilder::new().with_llm(custom_llm).build().await;
assert_eq!(harness.deps.llm.model_name(), "my-model");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_harness_db_works() {
let harness = TestHarnessBuilder::new().build().await;
let id = harness
.db
.create_conversation("test", "user1", None)
.await
.expect("create conversation");
assert!(!id.is_nil());
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_conversation_message_round_trip() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let conv_id = db
.create_conversation("tui", "alice", None)
.await
.expect("create conversation");
let m1 = db
.add_conversation_message(conv_id, "user", "Hello!")
.await
.expect("add msg 1");
let m2 = db
.add_conversation_message(conv_id, "assistant", "Hi there!")
.await
.expect("add msg 2");
let m3 = db
.add_conversation_message(conv_id, "user", "How are you?")
.await
.expect("add msg 3");
assert_ne!(m1, m2);
assert_ne!(m2, m3);
let msgs = db
.list_conversation_messages(conv_id)
.await
.expect("list messages");
assert_eq!(msgs.len(), 3);
assert_eq!(msgs[0].role, "user");
assert_eq!(msgs[0].content, "Hello!");
assert_eq!(msgs[1].role, "assistant");
assert_eq!(msgs[1].content, "Hi there!");
assert_eq!(msgs[2].role, "user");
assert_eq!(msgs[2].content, "How are you?");
assert!(msgs[0].created_at <= msgs[1].created_at);
assert!(msgs[1].created_at <= msgs[2].created_at);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_conversation_metadata_persistence() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let conv_id = db
.create_conversation("web", "bob", None)
.await
.expect("create conversation");
let meta = db
.get_conversation_metadata(conv_id)
.await
.expect("get metadata");
if let Some(m) = &meta {
assert!(m.is_null() || m.as_object().is_none_or(|o| o.is_empty()));
}
db.update_conversation_metadata_field(
conv_id,
"thread_type",
&serde_json::json!("assistant"),
)
.await
.expect("set thread_type");
let meta = db
.get_conversation_metadata(conv_id)
.await
.expect("get metadata after update")
.expect("metadata should exist");
assert_eq!(meta["thread_type"], "assistant");
db.update_conversation_metadata_field(conv_id, "model", &serde_json::json!("gpt-4"))
.await
.expect("set model");
let meta = db
.get_conversation_metadata(conv_id)
.await
.expect("get metadata after second update")
.expect("metadata should exist");
assert_eq!(meta["thread_type"], "assistant");
assert_eq!(meta["model"], "gpt-4");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_conversation_belongs_to_user() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let conv_id = db
.create_conversation("tui", "alice", None)
.await
.expect("create conversation");
assert!(
db.conversation_belongs_to_user(conv_id, "alice")
.await
.expect("belongs check")
);
assert!(
!db.conversation_belongs_to_user(conv_id, "mallory")
.await
.expect("belongs check other user")
);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_ensure_conversation_idempotent() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let conv_id = uuid::Uuid::new_v4();
assert!(
db.ensure_conversation(conv_id, "web", "carol", None)
.await
.expect("ensure first"),
"first ensure_conversation should create the row"
);
assert!(
db.ensure_conversation(conv_id, "web", "carol", None)
.await
.expect("ensure second (idempotent)"),
"second ensure_conversation should touch owned row"
);
let msg_id = db
.add_conversation_message(conv_id, "user", "test message")
.await
.expect("add message to ensured conversation");
assert!(!msg_id.is_nil());
let msgs = db
.list_conversation_messages(conv_id)
.await
.expect("list messages");
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "test message");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_ensure_conversation_foreign_conflict_does_not_touch_last_activity() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let conv_id = db
.create_conversation("web", "alice", None)
.await
.expect("create conversation");
let before = db
.list_conversations_all_channels("alice", 10)
.await
.expect("list conversations before foreign ensure")
.into_iter()
.find(|c| c.id == conv_id)
.expect("conversation must exist before foreign ensure")
.last_activity;
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
assert!(
!db.ensure_conversation(conv_id, "web", "mallory", None)
.await
.expect("foreign ensure should not error"),
"foreign ensure_conversation should report not ensured"
);
let after = db
.list_conversations_all_channels("alice", 10)
.await
.expect("list conversations after foreign ensure")
.into_iter()
.find(|c| c.id == conv_id)
.expect("conversation must still exist after foreign ensure")
.last_activity;
assert_eq!(
after, before,
"foreign ensure_conversation should not mutate last_activity"
);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_paginated_messages() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let conv_id = db
.create_conversation("tui", "dave", None)
.await
.expect("create conversation");
for i in 0..5 {
db.add_conversation_message(conv_id, "user", &format!("msg {i}"))
.await
.expect("add message");
}
let (page1, has_more) = db
.list_conversation_messages_paginated(conv_id, None, 3)
.await
.expect("page 1");
assert_eq!(page1.len(), 3, "first page should have 3 messages");
assert!(has_more, "should indicate more messages exist");
let (all, _) = db
.list_conversation_messages_paginated(conv_id, None, 100)
.await
.expect("all messages");
assert_eq!(all.len(), 5);
for w in all.windows(2) {
assert!(
w[0].created_at <= w[1].created_at,
"messages should be in ascending created_at order"
);
}
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_conversations_with_preview() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let c1 = db
.create_conversation("tui", "eve", None)
.await
.expect("create c1");
db.add_conversation_message(c1, "user", "First conversation opener")
.await
.expect("add msg to c1");
let c2 = db
.create_conversation("tui", "eve", None)
.await
.expect("create c2");
db.add_conversation_message(c2, "user", "Second conversation opener")
.await
.expect("add msg to c2");
let summaries = db
.list_conversations_with_preview("eve", "tui", 10)
.await
.expect("list with preview");
assert_eq!(summaries.len(), 2);
for s in &summaries {
assert!(s.message_count >= 1);
}
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_job_action_persistence() {
use crate::context::{ActionRecord, JobContext, JobState};
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let ctx = JobContext::with_user("user1", "Do something", "test task");
let job_id = ctx.job_id;
db.save_job(&ctx).await.expect("save job");
let fetched = db.get_job(job_id).await.expect("get job");
assert!(fetched.is_some());
let fetched = fetched.unwrap();
assert_eq!(fetched.job_id, job_id);
let action = ActionRecord {
id: uuid::Uuid::new_v4(),
sequence: 1,
tool_name: "echo".to_string(),
input: serde_json::json!({"message": "hello"}),
output_raw: Some("hello".to_string()),
output_sanitized: None,
sanitization_warnings: vec![],
cost: None,
duration: std::time::Duration::from_millis(42),
success: true,
error: None,
executed_at: chrono::Utc::now(),
};
db.save_action(job_id, &action).await.expect("save action");
let actions = db.get_job_actions(job_id).await.expect("get actions");
assert_eq!(actions.len(), 1);
assert_eq!(actions[0].tool_name, "echo");
assert_eq!(actions[0].output_raw, Some("hello".to_string()));
assert!(actions[0].success);
assert_eq!(actions[0].duration, std::time::Duration::from_millis(42));
db.update_job_status(job_id, JobState::Completed, None)
.await
.expect("update status");
let updated = db
.get_job(job_id)
.await
.expect("get updated job")
.expect("job should exist");
assert!(matches!(updated.state, JobState::Completed));
}
#[tokio::test]
async fn test_stub_llm_complete() {
let llm = StubLlm::new("hello world");
let response = llm
.complete(CompletionRequest::new(vec![]))
.await
.expect("complete");
assert_eq!(response.content, "hello world");
assert_eq!(response.finish_reason, FinishReason::Stop);
}
#[tokio::test]
async fn test_stub_channel_inject_and_capture() {
use futures::StreamExt;
let (channel, sender) = StubChannel::new("test-channel");
let mut stream = channel.start().await.expect("start failed");
sender
.send(IncomingMessage::new("test-channel", "user1", "hello"))
.await
.expect("send failed");
let msg = stream.next().await.expect("stream ended");
assert_eq!(msg.content, "hello");
assert_eq!(msg.user_id, "user1");
assert_eq!(msg.channel, "test-channel");
let response = OutgoingResponse::text("world");
channel
.respond(&msg, response)
.await
.expect("respond failed");
let captured = channel.captured_responses();
assert_eq!(captured.len(), 1);
assert_eq!(captured[0].1.content, "world");
}
#[tokio::test]
async fn test_stub_channel_health_check() {
let (channel, _sender) = StubChannel::new("healthy");
channel.health_check().await.expect("health check failed");
channel.set_healthy(false);
assert!(channel.health_check().await.is_err());
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_settings_crud() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let val = db.get_setting("user1", "theme").await.expect("get");
assert!(val.is_none());
db.set_setting("user1", "theme", &serde_json::json!("dark"))
.await
.expect("set");
let val = db
.get_setting("user1", "theme")
.await
.expect("get")
.expect("should exist");
assert_eq!(val, serde_json::json!("dark"));
db.set_setting("user1", "theme", &serde_json::json!("light"))
.await
.expect("set update");
let val = db
.get_setting("user1", "theme")
.await
.expect("get")
.expect("should exist");
assert_eq!(val, serde_json::json!("light"));
let all = db.list_settings("user1").await.expect("list");
assert_eq!(all.len(), 1);
let deleted = db.delete_setting("user1", "theme").await.expect("delete");
assert!(deleted);
let val = db.get_setting("user1", "theme").await.expect("get");
assert!(val.is_none());
let deleted = db.delete_setting("user1", "theme").await.expect("delete");
assert!(!deleted);
}
#[tokio::test]
async fn test_harness_with_channel() {
let harness = TestHarnessBuilder::new().with_stub_channel().build().await;
let (sender, channel_manager) =
harness.channel.as_ref().expect("channel should be present");
sender
.send(IncomingMessage::new("stub", "user1", "test message"))
.await
.expect("send failed");
let names = channel_manager.channel_names().await;
assert!(names.contains(&"stub".to_string()));
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_settings_bulk_operations() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let has = db.has_settings("bulk_user").await.expect("has_settings");
assert!(!has);
let mut settings = std::collections::HashMap::new();
settings.insert("key1".to_string(), serde_json::json!("value1"));
settings.insert("key2".to_string(), serde_json::json!(42));
db.set_all_settings("bulk_user", &settings)
.await
.expect("set_all");
let has = db.has_settings("bulk_user").await.expect("has_settings");
assert!(has);
let all = db.get_all_settings("bulk_user").await.expect("get_all");
assert_eq!(all.len(), 2);
assert_eq!(all["key1"], serde_json::json!("value1"));
assert_eq!(all["key2"], serde_json::json!(42));
let full = db
.get_setting_full("bulk_user", "key1")
.await
.expect("get_full")
.expect("should exist");
assert_eq!(full.key, "key1");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_tool_failure_tracking() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
db.record_tool_failure("bad_tool", "connection refused")
.await
.expect("record 1");
db.record_tool_failure("bad_tool", "timeout")
.await
.expect("record 2");
db.record_tool_failure("bad_tool", "parse error")
.await
.expect("record 3");
let broken = db.get_broken_tools(2).await.expect("get broken");
assert!(!broken.is_empty());
let found = broken.iter().find(|b| b.name == "bad_tool");
assert!(found.is_some(), "bad_tool should be in broken tools list");
db.mark_tool_repaired("bad_tool")
.await
.expect("mark repaired");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_routine_crud() {
use crate::agent::routine::{
NotifyConfig, Routine, RoutineAction, RoutineGuardrails, RoutineRun, RunStatus, Trigger,
};
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let routine_id = uuid::Uuid::new_v4();
let routine = Routine {
id: routine_id,
name: "test-routine".to_string(),
description: "A test routine".to_string(),
user_id: "user1".to_string(),
enabled: true,
trigger: Trigger::Cron {
schedule: "0 * * * *".to_string(),
timezone: None,
},
action: RoutineAction::Lightweight {
prompt: "Check status".to_string(),
context_paths: vec![],
max_tokens: 500,
use_tools: false,
max_tool_rounds: 3,
},
guardrails: RoutineGuardrails {
cooldown: std::time::Duration::from_secs(60),
max_concurrent: 1,
dedup_window: None,
},
notify: NotifyConfig {
channel: None,
user: Some("user1".to_string()),
on_attention: true,
on_failure: true,
on_success: false,
},
last_run_at: None,
next_fire_at: None,
run_count: 0,
consecutive_failures: 0,
state: serde_json::json!({}),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
db.create_routine(&routine).await.expect("create routine");
let fetched = db
.get_routine(routine_id)
.await
.expect("get routine")
.expect("should exist");
assert_eq!(fetched.name, "test-routine");
assert!(fetched.enabled);
let by_name = db
.get_routine_by_name("user1", "test-routine")
.await
.expect("get by name")
.expect("should exist");
assert_eq!(by_name.id, routine_id);
let list = db.list_routines("user1").await.expect("list routines");
assert_eq!(list.len(), 1);
let all = db.list_all_routines().await.expect("list all");
assert!(!all.is_empty());
let mut updated = fetched;
updated.enabled = false;
updated.description = "Updated description".to_string();
db.update_routine(&updated).await.expect("update routine");
let re_fetched = db
.get_routine(routine_id)
.await
.expect("get")
.expect("exists");
assert!(!re_fetched.enabled);
assert_eq!(re_fetched.description, "Updated description");
let run_id = uuid::Uuid::new_v4();
let run = RoutineRun {
id: run_id,
routine_id,
trigger_type: "cron".to_string(),
trigger_detail: Some("0 * * * *".to_string()),
started_at: chrono::Utc::now(),
completed_at: None,
status: RunStatus::Running,
result_summary: None,
tokens_used: None,
job_id: None,
created_at: chrono::Utc::now(),
};
db.create_routine_run(&run).await.expect("create run");
let runs = db
.list_routine_runs(routine_id, 10)
.await
.expect("list runs");
assert_eq!(runs.len(), 1);
assert!(matches!(runs[0].status, RunStatus::Running));
db.complete_routine_run(run_id, RunStatus::Ok, Some("All good"), Some(150))
.await
.expect("complete run");
let runs = db
.list_routine_runs(routine_id, 10)
.await
.expect("list runs after complete");
assert!(matches!(runs[0].status, RunStatus::Ok));
let deleted = db.delete_routine(routine_id).await.expect("delete");
assert!(deleted);
let deleted = db.delete_routine(routine_id).await.expect("delete again");
assert!(!deleted);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_routine_runtime_update() {
use crate::agent::routine::{
NotifyConfig, Routine, RoutineAction, RoutineGuardrails, Trigger,
};
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let routine_id = uuid::Uuid::new_v4();
let routine = Routine {
id: routine_id,
name: "runtime-test".to_string(),
description: "Test runtime update".to_string(),
user_id: "user1".to_string(),
enabled: true,
trigger: Trigger::Manual,
action: RoutineAction::Lightweight {
prompt: "test".to_string(),
context_paths: vec![],
max_tokens: 100,
use_tools: false,
max_tool_rounds: 3,
},
guardrails: RoutineGuardrails {
cooldown: std::time::Duration::from_secs(0),
max_concurrent: 1,
dedup_window: None,
},
notify: NotifyConfig {
channel: None,
user: Some("user1".to_string()),
on_attention: false,
on_failure: false,
on_success: false,
},
last_run_at: None,
next_fire_at: None,
run_count: 0,
consecutive_failures: 0,
state: serde_json::json!({}),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
db.create_routine(&routine).await.expect("create");
let now = chrono::Utc::now();
db.update_routine_runtime(
routine_id,
now,
Some(now + chrono::TimeDelta::seconds(3600)),
5,
2,
&serde_json::json!({"last_result": "ok"}),
)
.await
.expect("update runtime");
let fetched = db
.get_routine(routine_id)
.await
.expect("get")
.expect("exists");
assert_eq!(fetched.run_count, 5);
assert_eq!(fetched.consecutive_failures, 2);
assert!(fetched.last_run_at.is_some());
assert!(fetched.next_fire_at.is_some());
db.delete_routine(routine_id).await.expect("delete");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_llm_call_recording() {
use crate::history::LlmCallRecord;
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let record = LlmCallRecord {
job_id: None,
conversation_id: None,
provider: "openai",
model: "gpt-4",
input_tokens: 100,
output_tokens: 50,
cost: Decimal::new(5, 3), purpose: Some("test"),
};
let call_id = db.record_llm_call(&record).await.expect("record llm call");
assert!(!call_id.is_nil());
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_sandbox_job_lifecycle() {
use crate::history::SandboxJobRecord;
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let job_id = uuid::Uuid::new_v4();
let job = SandboxJobRecord {
id: job_id,
task: "Build a test tool".to_string(),
status: "creating".to_string(),
user_id: "user1".to_string(),
project_dir: "/workspace/test".to_string(),
success: None,
failure_reason: None,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
credential_grants_json: "[]".to_string(),
};
db.save_sandbox_job(&job).await.expect("save sandbox job");
let fetched = db
.get_sandbox_job(job_id)
.await
.expect("get")
.expect("should exist");
assert_eq!(fetched.task, "Build a test tool");
assert_eq!(fetched.status, "creating");
db.update_sandbox_job_status(
job_id,
"running",
None,
None,
Some(chrono::Utc::now()),
None,
)
.await
.expect("update to running");
db.update_sandbox_job_status(
job_id,
"completed",
Some(true),
Some("Done"),
None,
Some(chrono::Utc::now()),
)
.await
.expect("update to completed");
let fetched = db
.get_sandbox_job(job_id)
.await
.expect("get")
.expect("should exist");
assert_eq!(fetched.status, "completed");
assert_eq!(fetched.success, Some(true));
let all = db.list_sandbox_jobs().await.expect("list");
assert!(!all.is_empty());
let summary = db.sandbox_job_summary().await.expect("summary");
assert!(summary.total >= 1);
let user_jobs = db
.list_sandbox_jobs_for_user("user1")
.await
.expect("user list");
assert!(!user_jobs.is_empty());
let belongs = db
.sandbox_job_belongs_to_user(job_id, "user1")
.await
.expect("belongs check");
assert!(belongs);
let not_belongs = db
.sandbox_job_belongs_to_user(job_id, "other_user")
.await
.expect("belongs check");
assert!(!not_belongs);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_sandbox_job_mode() {
use crate::history::SandboxJobRecord;
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let job_id = uuid::Uuid::new_v4();
let job = SandboxJobRecord {
id: job_id,
task: "Mode test".to_string(),
status: "creating".to_string(),
user_id: "user1".to_string(),
project_dir: "/workspace".to_string(),
success: None,
failure_reason: None,
created_at: chrono::Utc::now(),
started_at: None,
completed_at: None,
credential_grants_json: "[]".to_string(),
};
db.save_sandbox_job(&job).await.expect("save");
let mode = db.get_sandbox_job_mode(job_id).await.expect("get mode");
assert!(mode.is_none() || mode.as_deref() == Some("worker"));
db.update_sandbox_job_mode(job_id, "claude_code")
.await
.expect("update mode");
let mode = db
.get_sandbox_job_mode(job_id)
.await
.expect("get mode")
.expect("should have mode");
assert_eq!(mode, "claude_code");
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_job_events() {
use crate::history::SandboxJobRecord;
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let job_id = uuid::Uuid::new_v4();
let job = SandboxJobRecord {
id: job_id,
task: "Event test".to_string(),
status: "running".to_string(),
user_id: "user1".to_string(),
project_dir: "/workspace".to_string(),
success: None,
failure_reason: None,
created_at: chrono::Utc::now(),
started_at: Some(chrono::Utc::now()),
completed_at: None,
credential_grants_json: "[]".to_string(),
};
db.save_sandbox_job(&job).await.expect("save job");
db.save_job_event(
job_id,
"tool_call",
&serde_json::json!({"tool": "shell", "args": {"command": "ls"}}),
)
.await
.expect("save event 1");
db.save_job_event(
job_id,
"tool_result",
&serde_json::json!({"output": "file1.txt\nfile2.txt"}),
)
.await
.expect("save event 2");
db.save_job_event(
job_id,
"llm_response",
&serde_json::json!({"content": "Found 2 files"}),
)
.await
.expect("save event 3");
let events = db.list_job_events(job_id, None).await.expect("list events");
assert_eq!(events.len(), 3);
let events = db
.list_job_events(job_id, Some(2))
.await
.expect("list events limited");
assert_eq!(events.len(), 2);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_estimation_snapshot_round_trip() {
let harness = TestHarnessBuilder::new().build().await;
let db = &harness.db;
let job_ctx = crate::context::JobContext::with_user("user1", "Estimate test", "testing");
let job_id = job_ctx.job_id;
db.save_job(&job_ctx).await.expect("save job");
let snap_id = db
.save_estimation_snapshot(
job_id,
"code_generation",
&["shell".to_string(), "write_file".to_string()],
Decimal::new(50, 2), 120,
Decimal::new(500, 2), )
.await
.expect("save snapshot");
assert!(!snap_id.is_nil());
db.update_estimation_actuals(
snap_id,
Decimal::new(45, 2), 110,
Some(Decimal::new(600, 2)), )
.await
.expect("update actuals");
}
#[tokio::test]
async fn stub_llm_fault_injector_sequence() {
use crate::llm::LlmProvider;
use crate::testing::fault_injection::{FaultAction, FaultInjector, FaultType};
let injector = Arc::new(FaultInjector::sequence([
FaultAction::Fail(FaultType::RateLimited { retry_after: None }),
FaultAction::Succeed,
]));
let stub = StubLlm::new("hello").with_fault_injector(injector);
let req = crate::llm::CompletionRequest::new(vec![crate::llm::ChatMessage::user("hi")]);
let result = stub.complete(req.clone()).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), LlmError::RateLimited { .. }));
let result = stub.complete(req).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().content, "hello");
}
}