use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use jiff::Timestamp;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use crate::error::{RlmError, RlmResult};
use crate::query_loop::TerminationReason;
use crate::types::{BudgetStatus, Command, SessionId};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event_type", rename_all = "snake_case")]
pub enum TrajectoryEvent {
SessionStart {
timestamp: Timestamp,
session_id: SessionId,
budget: BudgetStatus,
},
QueryStart {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
initial_prompt: String,
parent_query_id: Option<Ulid>,
depth: u32,
},
LlmCall {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
prompt: String,
prompt_length: usize,
},
LlmResponse {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
response: String,
response_length: usize,
tokens_used: u64,
latency_ms: u64,
},
CommandParsed {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
command: Command,
command_type: String,
},
CommandParseFailed {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
raw_response: String,
error: String,
},
CommandExecuted {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
command: Command,
success: bool,
stdout: String,
stderr: String,
exit_code: Option<i32>,
execution_time_ms: u64,
},
QueryComplete {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
total_iterations: u32,
result: Option<String>,
success: bool,
termination_reason: String,
duration_ms: u64,
total_tokens: u64,
},
SessionEnd {
timestamp: Timestamp,
session_id: SessionId,
total_queries: u32,
total_tokens: u64,
duration_ms: u64,
},
BudgetWarning {
timestamp: Timestamp,
session_id: SessionId,
query_id: Ulid,
warning_type: String,
current: u64,
maximum: u64,
percentage: f64,
},
Error {
timestamp: Timestamp,
session_id: SessionId,
query_id: Option<Ulid>,
error: String,
category: String,
},
}
impl TrajectoryEvent {
pub fn timestamp(&self) -> Timestamp {
match self {
Self::SessionStart { timestamp, .. } => *timestamp,
Self::QueryStart { timestamp, .. } => *timestamp,
Self::LlmCall { timestamp, .. } => *timestamp,
Self::LlmResponse { timestamp, .. } => *timestamp,
Self::CommandParsed { timestamp, .. } => *timestamp,
Self::CommandParseFailed { timestamp, .. } => *timestamp,
Self::CommandExecuted { timestamp, .. } => *timestamp,
Self::QueryComplete { timestamp, .. } => *timestamp,
Self::SessionEnd { timestamp, .. } => *timestamp,
Self::BudgetWarning { timestamp, .. } => *timestamp,
Self::Error { timestamp, .. } => *timestamp,
}
}
pub fn session_id(&self) -> SessionId {
match self {
Self::SessionStart { session_id, .. } => *session_id,
Self::QueryStart { session_id, .. } => *session_id,
Self::LlmCall { session_id, .. } => *session_id,
Self::LlmResponse { session_id, .. } => *session_id,
Self::CommandParsed { session_id, .. } => *session_id,
Self::CommandParseFailed { session_id, .. } => *session_id,
Self::CommandExecuted { session_id, .. } => *session_id,
Self::QueryComplete { session_id, .. } => *session_id,
Self::SessionEnd { session_id, .. } => *session_id,
Self::BudgetWarning { session_id, .. } => *session_id,
Self::Error { session_id, .. } => *session_id,
}
}
pub fn event_type(&self) -> &'static str {
match self {
Self::SessionStart { .. } => "session_start",
Self::QueryStart { .. } => "query_start",
Self::LlmCall { .. } => "llm_call",
Self::LlmResponse { .. } => "llm_response",
Self::CommandParsed { .. } => "command_parsed",
Self::CommandParseFailed { .. } => "command_parse_failed",
Self::CommandExecuted { .. } => "command_executed",
Self::QueryComplete { .. } => "query_complete",
Self::SessionEnd { .. } => "session_end",
Self::BudgetWarning { .. } => "budget_warning",
Self::Error { .. } => "error",
}
}
}
trait LogBackend: Send + Sync {
fn write(&mut self, event: &TrajectoryEvent) -> RlmResult<()>;
fn flush(&mut self) -> RlmResult<()>;
}
struct FileBackend {
writer: BufWriter<std::fs::File>,
#[allow(dead_code)] path: PathBuf,
}
impl FileBackend {
fn new(path: impl AsRef<Path>) -> RlmResult<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| RlmError::ConfigError {
message: format!("Failed to create log directory: {}", e),
})?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| RlmError::ConfigError {
message: format!("Failed to open log file: {}", e),
})?;
Ok(Self {
writer: BufWriter::new(file),
path,
})
}
}
impl LogBackend for FileBackend {
fn write(&mut self, event: &TrajectoryEvent) -> RlmResult<()> {
let json = serde_json::to_string(event).map_err(|e| RlmError::ConfigError {
message: format!("Failed to serialize event: {}", e),
})?;
writeln!(self.writer, "{}", json).map_err(|e| RlmError::ConfigError {
message: format!("Failed to write to log file: {}", e),
})?;
Ok(())
}
fn flush(&mut self) -> RlmResult<()> {
self.writer.flush().map_err(|e| RlmError::ConfigError {
message: format!("Failed to flush log file: {}", e),
})?;
Ok(())
}
}
struct MemoryBackend {
events: Vec<TrajectoryEvent>,
}
impl MemoryBackend {
fn new() -> Self {
Self { events: Vec::new() }
}
#[allow(dead_code)] fn events(&self) -> &[TrajectoryEvent] {
&self.events
}
}
impl LogBackend for MemoryBackend {
fn write(&mut self, event: &TrajectoryEvent) -> RlmResult<()> {
self.events.push(event.clone());
Ok(())
}
fn flush(&mut self) -> RlmResult<()> {
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct TrajectoryLoggerConfig {
pub enabled: bool,
pub log_path: Option<PathBuf>,
pub max_content_length: usize,
pub log_llm_content: bool,
pub log_command_output: bool,
pub flush_interval: u32,
}
impl Default for TrajectoryLoggerConfig {
fn default() -> Self {
Self {
enabled: true,
log_path: None,
max_content_length: 10_000,
log_llm_content: true,
log_command_output: true,
flush_interval: 10,
}
}
}
impl TrajectoryLoggerConfig {
pub fn with_file(path: impl AsRef<Path>) -> Self {
Self {
log_path: Some(path.as_ref().to_path_buf()),
..Default::default()
}
}
pub fn in_memory() -> Self {
Self {
log_path: None,
..Default::default()
}
}
pub fn metadata_only(mut self) -> Self {
self.log_llm_content = false;
self.log_command_output = false;
self
}
}
pub struct TrajectoryLogger {
config: TrajectoryLoggerConfig,
backend: Arc<Mutex<Box<dyn LogBackend>>>,
events_since_flush: Arc<Mutex<u32>>,
}
impl TrajectoryLogger {
pub fn new(config: TrajectoryLoggerConfig) -> RlmResult<Self> {
let backend: Box<dyn LogBackend> = if let Some(ref path) = config.log_path {
Box::new(FileBackend::new(path)?)
} else {
Box::new(MemoryBackend::new())
};
Ok(Self {
config,
backend: Arc::new(Mutex::new(backend)),
events_since_flush: Arc::new(Mutex::new(0)),
})
}
pub fn to_file(path: impl AsRef<Path>) -> RlmResult<Self> {
Self::new(TrajectoryLoggerConfig::with_file(path))
}
pub fn in_memory() -> RlmResult<Self> {
Self::new(TrajectoryLoggerConfig::in_memory())
}
pub fn disabled() -> Self {
Self {
config: TrajectoryLoggerConfig {
enabled: false,
..Default::default()
},
backend: Arc::new(Mutex::new(Box::new(MemoryBackend::new()))),
events_since_flush: Arc::new(Mutex::new(0)),
}
}
pub fn log(&self, event: TrajectoryEvent) -> RlmResult<()> {
if !self.config.enabled {
return Ok(());
}
let mut backend = self.backend.lock();
backend.write(&event)?;
let mut count = self.events_since_flush.lock();
*count += 1;
if self.config.flush_interval > 0 && *count >= self.config.flush_interval {
backend.flush()?;
*count = 0;
}
Ok(())
}
pub fn flush(&self) -> RlmResult<()> {
if !self.config.enabled {
return Ok(());
}
let mut backend = self.backend.lock();
backend.flush()?;
*self.events_since_flush.lock() = 0;
Ok(())
}
pub fn get_events(&self) -> Option<Vec<TrajectoryEvent>> {
if self.config.log_path.is_some() {
return None;
}
None
}
pub fn log_path(&self) -> Option<&Path> {
self.config.log_path.as_deref()
}
pub fn log_session_start(&self, session_id: SessionId, budget: BudgetStatus) -> RlmResult<()> {
self.log(TrajectoryEvent::SessionStart {
timestamp: Timestamp::now(),
session_id,
budget,
})
}
pub fn log_query_start(
&self,
session_id: SessionId,
query_id: Ulid,
initial_prompt: &str,
parent_query_id: Option<Ulid>,
depth: u32,
) -> RlmResult<()> {
let prompt = self.truncate_content(initial_prompt);
self.log(TrajectoryEvent::QueryStart {
timestamp: Timestamp::now(),
session_id,
query_id,
initial_prompt: prompt,
parent_query_id,
depth,
})
}
pub fn log_llm_call(
&self,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
prompt: &str,
) -> RlmResult<()> {
let prompt_content = if self.config.log_llm_content {
self.truncate_content(prompt)
} else {
"<redacted>".to_string()
};
self.log(TrajectoryEvent::LlmCall {
timestamp: Timestamp::now(),
session_id,
query_id,
iteration,
prompt_length: prompt.len(),
prompt: prompt_content,
})
}
pub fn log_llm_response(
&self,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
response: &str,
tokens_used: u64,
latency_ms: u64,
) -> RlmResult<()> {
let response_content = if self.config.log_llm_content {
self.truncate_content(response)
} else {
"<redacted>".to_string()
};
self.log(TrajectoryEvent::LlmResponse {
timestamp: Timestamp::now(),
session_id,
query_id,
iteration,
response_length: response.len(),
response: response_content,
tokens_used,
latency_ms,
})
}
pub fn log_command_parsed(
&self,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
command: &Command,
) -> RlmResult<()> {
let command_type = match command {
Command::Run(_) => "run",
Command::Code(_) => "code",
Command::Final(_) => "final",
Command::FinalVar(_) => "final_var",
Command::QueryLlm(_) => "query_llm",
Command::QueryLlmBatched(_) => "query_llm_batched",
Command::Snapshot(_) => "snapshot",
Command::Rollback(_) => "rollback",
};
self.log(TrajectoryEvent::CommandParsed {
timestamp: Timestamp::now(),
session_id,
query_id,
iteration,
command: command.clone(),
command_type: command_type.to_string(),
})
}
pub fn log_command_parse_failed(
&self,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
raw_response: &str,
error: &str,
) -> RlmResult<()> {
self.log(TrajectoryEvent::CommandParseFailed {
timestamp: Timestamp::now(),
session_id,
query_id,
iteration,
raw_response: self.truncate_content(raw_response),
error: error.to_string(),
})
}
#[allow(clippy::too_many_arguments)]
pub fn log_command_executed(
&self,
session_id: SessionId,
query_id: Ulid,
iteration: u32,
command: &Command,
success: bool,
stdout: &str,
stderr: &str,
exit_code: Option<i32>,
execution_time_ms: u64,
) -> RlmResult<()> {
let (stdout_content, stderr_content) = if self.config.log_command_output {
(self.truncate_content(stdout), self.truncate_content(stderr))
} else {
("<redacted>".to_string(), "<redacted>".to_string())
};
self.log(TrajectoryEvent::CommandExecuted {
timestamp: Timestamp::now(),
session_id,
query_id,
iteration,
command: command.clone(),
success,
stdout: stdout_content,
stderr: stderr_content,
exit_code,
execution_time_ms,
})
}
#[allow(clippy::too_many_arguments)]
pub fn log_query_complete(
&self,
session_id: SessionId,
query_id: Ulid,
total_iterations: u32,
result: Option<&str>,
success: bool,
termination_reason: &TerminationReason,
duration_ms: u64,
total_tokens: u64,
) -> RlmResult<()> {
let reason_str = match termination_reason {
TerminationReason::FinalReached => "final_reached",
TerminationReason::FinalVarReached { .. } => "final_var_reached",
TerminationReason::TokenBudgetExhausted => "token_budget_exhausted",
TerminationReason::TimeBudgetExhausted => "time_budget_exhausted",
TerminationReason::MaxIterationsReached => "max_iterations_reached",
TerminationReason::RecursionDepthExhausted => "recursion_depth_exhausted",
TerminationReason::Error { .. } => "error",
TerminationReason::Cancelled => "cancelled",
};
self.log(TrajectoryEvent::QueryComplete {
timestamp: Timestamp::now(),
session_id,
query_id,
total_iterations,
result: result.map(|s| self.truncate_content(s)),
success,
termination_reason: reason_str.to_string(),
duration_ms,
total_tokens,
})
}
pub fn log_session_end(
&self,
session_id: SessionId,
total_queries: u32,
total_tokens: u64,
duration_ms: u64,
) -> RlmResult<()> {
self.log(TrajectoryEvent::SessionEnd {
timestamp: Timestamp::now(),
session_id,
total_queries,
total_tokens,
duration_ms,
})
}
pub fn log_budget_warning(
&self,
session_id: SessionId,
query_id: Ulid,
warning_type: &str,
current: u64,
maximum: u64,
) -> RlmResult<()> {
let percentage = if maximum > 0 {
(current as f64 / maximum as f64) * 100.0
} else {
100.0
};
self.log(TrajectoryEvent::BudgetWarning {
timestamp: Timestamp::now(),
session_id,
query_id,
warning_type: warning_type.to_string(),
current,
maximum,
percentage,
})
}
pub fn log_error(
&self,
session_id: SessionId,
query_id: Option<Ulid>,
error: &str,
category: &str,
) -> RlmResult<()> {
self.log(TrajectoryEvent::Error {
timestamp: Timestamp::now(),
session_id,
query_id,
error: error.to_string(),
category: category.to_string(),
})
}
fn truncate_content(&self, content: &str) -> String {
if content.len() <= self.config.max_content_length {
content.to_string()
} else {
format!(
"{}... [truncated, {} chars total]",
&content[..self.config.max_content_length],
content.len()
)
}
}
}
pub fn read_trajectory_file(path: impl AsRef<Path>) -> RlmResult<Vec<TrajectoryEvent>> {
let content = std::fs::read_to_string(path.as_ref()).map_err(|e| RlmError::ConfigError {
message: format!("Failed to read trajectory file: {}", e),
})?;
let mut events = Vec::new();
for (line_num, line) in content.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let event: TrajectoryEvent =
serde_json::from_str(line).map_err(|e| RlmError::ConfigError {
message: format!(
"Failed to parse trajectory event at line {}: {}",
line_num + 1,
e
),
})?;
events.push(event);
}
Ok(events)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_trajectory_event_types() {
let session_id = SessionId::new();
let event = TrajectoryEvent::SessionStart {
timestamp: Timestamp::now(),
session_id,
budget: BudgetStatus::default(),
};
assert_eq!(event.event_type(), "session_start");
assert_eq!(event.session_id(), session_id);
}
#[test]
fn test_trajectory_logger_disabled() {
let logger = TrajectoryLogger::disabled();
let session_id = SessionId::new();
assert!(
logger
.log_session_start(session_id, BudgetStatus::default())
.is_ok()
);
}
#[test]
fn test_trajectory_logger_in_memory() {
let logger = TrajectoryLogger::in_memory().unwrap();
let session_id = SessionId::new();
logger
.log_session_start(session_id, BudgetStatus::default())
.unwrap();
logger.flush().unwrap();
}
#[test]
fn test_trajectory_event_serialization() {
let session_id = SessionId::new();
let event = TrajectoryEvent::LlmCall {
timestamp: Timestamp::now(),
session_id,
query_id: Ulid::new(),
iteration: 1,
prompt: "Hello, world!".to_string(),
prompt_length: 13,
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("llm_call"));
assert!(json.contains("Hello, world!"));
let parsed: TrajectoryEvent = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.event_type(), "llm_call");
}
#[test]
fn test_trajectory_logger_config() {
let config = TrajectoryLoggerConfig::default();
assert!(config.enabled);
assert!(config.log_llm_content);
assert!(config.log_command_output);
assert_eq!(config.max_content_length, 10_000);
let metadata_only = TrajectoryLoggerConfig::default().metadata_only();
assert!(!metadata_only.log_llm_content);
assert!(!metadata_only.log_command_output);
}
#[test]
fn test_truncate_content() {
let config = TrajectoryLoggerConfig {
max_content_length: 20,
..Default::default()
};
let logger = TrajectoryLogger::new(config).unwrap();
let short = "Hello";
assert_eq!(logger.truncate_content(short), "Hello");
let long = "This is a very long string that should be truncated";
let truncated = logger.truncate_content(long);
assert!(truncated.starts_with("This is a very long "));
assert!(truncated.contains("truncated"));
}
#[test]
fn test_command_type_extraction() {
use crate::types::{BashCommand, PythonCode};
let logger = TrajectoryLogger::in_memory().unwrap();
let session_id = SessionId::new();
let query_id = Ulid::new();
let commands = vec![
(Command::Run(BashCommand::new("ls")), "run"),
(Command::Code(PythonCode::new("print(1)")), "code"),
(Command::Final("done".into()), "final"),
(Command::FinalVar("x".into()), "final_var"),
];
for (cmd, expected_type) in commands {
let result = logger.log_command_parsed(session_id, query_id, 1, &cmd);
assert!(result.is_ok());
let _ = expected_type; }
}
#[test]
fn test_termination_reason_to_string() {
let reasons = vec![
(TerminationReason::FinalReached, "final_reached"),
(
TerminationReason::FinalVarReached {
variable: "x".into(),
},
"final_var_reached",
),
(
TerminationReason::TokenBudgetExhausted,
"token_budget_exhausted",
),
(
TerminationReason::TimeBudgetExhausted,
"time_budget_exhausted",
),
(
TerminationReason::MaxIterationsReached,
"max_iterations_reached",
),
(
TerminationReason::RecursionDepthExhausted,
"recursion_depth_exhausted",
),
(
TerminationReason::Error {
message: "test".into(),
},
"error",
),
(TerminationReason::Cancelled, "cancelled"),
];
let logger = TrajectoryLogger::in_memory().unwrap();
let session_id = SessionId::new();
let query_id = Ulid::new();
for (reason, expected) in reasons {
let result = logger.log_query_complete(
session_id,
query_id,
5,
Some("result"),
true,
&reason,
1000,
500,
);
assert!(result.is_ok());
let _ = expected; }
}
}