agentic_jujutsu/
agentdb_sync.rs

1//! AgentDB synchronization for operation logs
2//!
3//! This module provides integration with AgentDB for storing and querying
4//! jj operation history, enabling AI agents to learn from past operations.
5
6use crate::{JJError, JJOperation, Result};
7#[cfg(not(target_arch = "wasm32"))]
8use crate::mcp::{MCPClient, MCPClientConfig};
9use serde::{Deserialize, Serialize};
10
11/// Episode data structure for AgentDB storage
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct AgentDBEpisode {
14    /// Session identifier
15    pub session_id: String,
16    /// Task description
17    pub task: String,
18    /// Agent identifier
19    pub agent_id: String,
20    /// Input context
21    pub input: Option<String>,
22    /// Output or result
23    pub output: Option<String>,
24    /// Self-critique or reflection
25    pub critique: Option<String>,
26    /// Success indicator
27    pub success: bool,
28    /// Reward score (0.0 to 1.0)
29    pub reward: f64,
30    /// Latency in milliseconds
31    pub latency_ms: Option<u64>,
32    /// Token usage count
33    pub tokens_used: Option<u64>,
34    /// Associated JJ operation
35    pub operation: Option<JJOperation>,
36    /// Unix timestamp
37    pub timestamp: i64,
38}
39
40impl AgentDBEpisode {
41    /// Create a new episode from a JJ operation
42    pub fn from_operation(op: &JJOperation, session_id: String, agent_id: String) -> Self {
43        Self {
44            session_id,
45            task: op.command.clone(),
46            agent_id,
47            input: None,
48            output: None,
49            critique: None,
50            success: true,
51            reward: 1.0,
52            latency_ms: None,
53            tokens_used: None,
54            operation: Some(op.clone()),
55            timestamp: op.timestamp.timestamp(),
56        }
57    }
58
59    /// Set input context
60    pub fn with_input(mut self, input: String) -> Self {
61        self.input = Some(input);
62        self
63    }
64
65    /// Set output result
66    pub fn with_output(mut self, output: String) -> Self {
67        self.output = Some(output);
68        self
69    }
70
71    /// Set critique/reflection
72    pub fn with_critique(mut self, critique: String) -> Self {
73        self.critique = Some(critique);
74        self
75    }
76
77    /// Set success and reward
78    pub fn with_success(mut self, success: bool, reward: f64) -> Self {
79        self.success = success;
80        self.reward = reward.clamp(0.0, 1.0);
81        self
82    }
83
84    /// Set performance metrics
85    pub fn with_metrics(mut self, latency_ms: u64, tokens_used: u64) -> Self {
86        self.latency_ms = Some(latency_ms);
87        self.tokens_used = Some(tokens_used);
88        self
89    }
90}
91
92/// AgentDB synchronization manager
93pub struct AgentDBSync {
94    /// Whether sync is enabled
95    enabled: bool,
96    /// Base URL for AgentDB API (if using remote)
97    api_url: Option<String>,
98    /// MCP client for AgentDB communication (native only)
99    #[cfg(not(target_arch = "wasm32"))]
100    mcp_client: Option<MCPClient>,
101}
102
103impl AgentDBSync {
104    /// Create a new AgentDB sync manager
105    pub fn new(enabled: bool) -> Self {
106        Self {
107            enabled,
108            api_url: None,
109            #[cfg(not(target_arch = "wasm32"))]
110            mcp_client: None,
111        }
112    }
113
114    /// Create with MCP client for real AgentDB communication (native only)
115    #[cfg(not(target_arch = "wasm32"))]
116    pub async fn with_mcp(enabled: bool, mcp_config: MCPClientConfig) -> Result<Self> {
117        let mcp_client = if enabled {
118            Some(MCPClient::new(mcp_config).await?)
119        } else {
120            None
121        };
122
123        Ok(Self {
124            enabled,
125            api_url: None,
126            mcp_client,
127        })
128    }
129
130    /// Create with custom API URL
131    pub fn with_api_url(mut self, url: String) -> Self {
132        self.api_url = Some(url);
133        self
134    }
135
136    /// Sync a single operation to AgentDB
137    pub async fn sync_operation(
138        &self,
139        op: &JJOperation,
140        session_id: &str,
141        agent_id: &str,
142    ) -> Result<()> {
143        if !self.enabled {
144            return Ok(());
145        }
146
147        let episode =
148            AgentDBEpisode::from_operation(op, session_id.to_string(), agent_id.to_string());
149        self.store_episode(&episode).await
150    }
151
152    /// Store an episode in AgentDB
153    pub async fn store_episode(&self, episode: &AgentDBEpisode) -> Result<()> {
154        if !self.enabled {
155            return Ok(());
156        }
157
158        // If MCP client is available, use it for real AgentDB communication (native only)
159        #[cfg(not(target_arch = "wasm32"))]
160        {
161            if let Some(client) = &self.mcp_client {
162                let episode_value = serde_json::to_value(episode)
163                    .map_err(|e| JJError::SerializationError(e.to_string()))?;
164
165                client.store_pattern(episode_value).await?;
166
167                #[cfg(feature = "native")]
168                println!("[agentdb-sync] ✅ Stored episode via MCP: {}", episode.session_id);
169
170                return Ok(());
171            }
172        }
173
174        // Fallback: Log to console/file
175        let episode_json = serde_json::to_string_pretty(episode)
176            .map_err(|e| JJError::SerializationError(e.to_string()))?;
177
178        #[cfg(feature = "native")]
179        {
180            println!("[agentdb-sync] Would store episode:");
181            println!("{}", episode_json);
182
183            // Optionally write to file for later batch import
184            if let Ok(path) = std::env::var("AGENTDB_SYNC_FILE") {
185                use std::io::Write;
186                if let Ok(mut file) = std::fs::OpenOptions::new()
187                    .create(true)
188                    .append(true)
189                    .open(&path)
190                {
191                    writeln!(file, "{}", episode_json).ok();
192                }
193            }
194        }
195
196        #[cfg(target_arch = "wasm32")]
197        {
198            web_sys::console::log_1(&format!("[agentdb-sync] {}", episode_json).into());
199        }
200
201        Ok(())
202    }
203
204    /// Query similar operations from AgentDB
205    pub async fn query_similar_operations(
206        &self,
207        task: &str,
208        limit: usize,
209    ) -> Result<Vec<AgentDBEpisode>> {
210        if !self.enabled {
211            return Ok(vec![]);
212        }
213
214        // If MCP client is available, use it for real AgentDB queries (native only)
215        #[cfg(not(target_arch = "wasm32"))]
216        {
217            if let Some(client) = &self.mcp_client {
218                let result = client.search_patterns(task.to_string(), limit).await?;
219
220                // Parse response into episodes
221                let episodes: Vec<AgentDBEpisode> = serde_json::from_value(result)
222                    .map_err(|e| JJError::SerializationError(format!("Failed to parse episodes: {}", e)))?;
223
224                #[cfg(feature = "native")]
225                println!("[agentdb-sync] ✅ Found {} similar episodes via MCP", episodes.len());
226
227                return Ok(episodes);
228            }
229        }
230
231        // Fallback: Log and return empty
232        #[cfg(feature = "native")]
233        {
234            println!(
235                "[agentdb-sync] Would query similar operations for: {}",
236                task
237            );
238            println!("[agentdb-sync] Limit: {}", limit);
239        }
240
241        #[cfg(target_arch = "wasm32")]
242        {
243            web_sys::console::log_1(
244                &format!("[agentdb-sync] Would query: {} (limit: {})", task, limit).into(),
245            );
246        }
247
248        Ok(vec![])
249    }
250
251    /// Get statistics for operations related to a task
252    pub async fn get_task_statistics(&self, task_pattern: &str) -> Result<TaskStatistics> {
253        if !self.enabled {
254            return Ok(TaskStatistics::default());
255        }
256
257        // If MCP client is available, use it for real AgentDB statistics (native only)
258        #[cfg(not(target_arch = "wasm32"))]
259        {
260            if let Some(client) = &self.mcp_client {
261                let result = client.get_pattern_stats(task_pattern.to_string(), 10).await?;
262
263                // Parse response into statistics
264                let stats: TaskStatistics = serde_json::from_value(result)
265                    .map_err(|e| JJError::SerializationError(format!("Failed to parse stats: {}", e)))?;
266
267                #[cfg(feature = "native")]
268                println!("[agentdb-sync] ✅ Retrieved statistics via MCP for: {}", task_pattern);
269
270                return Ok(stats);
271            }
272        }
273
274        // Fallback: Log and return default
275        #[cfg(feature = "native")]
276        {
277            println!(
278                "[agentdb-sync] Would get statistics for pattern: {}",
279                task_pattern
280            );
281        }
282
283        Ok(TaskStatistics::default())
284    }
285
286    /// Batch sync multiple operations
287    pub async fn batch_sync_operations(
288        &self,
289        operations: &[(JJOperation, String, String)], // (operation, session_id, agent_id)
290    ) -> Result<()> {
291        if !self.enabled {
292            return Ok(());
293        }
294
295        for (op, session_id, agent_id) in operations {
296            self.sync_operation(op, session_id, agent_id).await?;
297        }
298
299        Ok(())
300    }
301
302    /// Check if sync is enabled
303    pub fn is_enabled(&self) -> bool {
304        self.enabled
305    }
306}
307
308/// Statistics for task operations
309#[derive(Debug, Clone, Default, Serialize, Deserialize)]
310pub struct TaskStatistics {
311    /// Total number of operations
312    pub total_operations: usize,
313    /// Number of successful operations
314    pub successful_operations: usize,
315    /// Number of failed operations
316    pub failed_operations: usize,
317    /// Average reward score
318    pub average_reward: f64,
319    /// Average latency in milliseconds
320    pub average_latency_ms: Option<f64>,
321    /// Total tokens used
322    pub total_tokens: Option<u64>,
323    /// Common critique patterns
324    pub common_critiques: Vec<String>,
325}
326
327impl TaskStatistics {
328    /// Calculate success rate
329    pub fn success_rate(&self) -> f64 {
330        if self.total_operations == 0 {
331            return 0.0;
332        }
333        self.successful_operations as f64 / self.total_operations as f64
334    }
335
336    /// Calculate failure rate
337    pub fn failure_rate(&self) -> f64 {
338        1.0 - self.success_rate()
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::OperationType;
346
347    #[test]
348    fn test_episode_creation() {
349        let op = JJOperation::builder()
350            .operation_id("test-op".to_string())
351            .operation_type(OperationType::Describe)
352            .command("Test operation".to_string())
353            .user("test-user".to_string())
354            .hostname("localhost".to_string())
355            .build();
356
357        let episode =
358            AgentDBEpisode::from_operation(&op, "session-001".to_string(), "agent-001".to_string());
359
360        assert_eq!(episode.session_id, "session-001");
361        assert_eq!(episode.agent_id, "agent-001");
362        assert_eq!(episode.task, "Test operation");
363        assert!(episode.success);
364        assert_eq!(episode.reward, 1.0);
365    }
366
367    #[test]
368    fn test_episode_builder() {
369        let op = JJOperation::builder()
370            .operation_id("test-op".to_string())
371            .operation_type(OperationType::Describe)
372            .command("Test operation".to_string())
373            .user("test-user".to_string())
374            .hostname("localhost".to_string())
375            .build();
376
377        let episode =
378            AgentDBEpisode::from_operation(&op, "session-001".to_string(), "agent-001".to_string())
379                .with_input("input context".to_string())
380                .with_output("output result".to_string())
381                .with_critique("good work".to_string())
382                .with_success(true, 0.95)
383                .with_metrics(1500, 250);
384
385        assert_eq!(episode.input.unwrap(), "input context");
386        assert_eq!(episode.output.unwrap(), "output result");
387        assert_eq!(episode.critique.unwrap(), "good work");
388        assert_eq!(episode.reward, 0.95);
389        assert_eq!(episode.latency_ms.unwrap(), 1500);
390        assert_eq!(episode.tokens_used.unwrap(), 250);
391    }
392
393    #[test]
394    fn test_task_statistics() {
395        let stats = TaskStatistics {
396            total_operations: 100,
397            successful_operations: 85,
398            failed_operations: 15,
399            average_reward: 0.85,
400            average_latency_ms: Some(1200.0),
401            total_tokens: Some(50000),
402            common_critiques: vec!["needs optimization".to_string()],
403        };
404
405        assert!((stats.success_rate() - 0.85).abs() < 0.001);
406        assert!((stats.failure_rate() - 0.15).abs() < 0.001);
407    }
408
409    #[tokio::test]
410    async fn test_agentdb_sync_creation() {
411        let sync = AgentDBSync::new(true);
412        assert!(sync.is_enabled());
413
414        let sync = AgentDBSync::new(false);
415        assert!(!sync.is_enabled());
416    }
417
418    #[tokio::test]
419    async fn test_sync_disabled() {
420        let sync = AgentDBSync::new(false);
421        let op = JJOperation::builder()
422            .operation_id("test-op".to_string())
423            .operation_type(OperationType::Describe)
424            .command("Test operation".to_string())
425            .user("test-user".to_string())
426            .hostname("localhost".to_string())
427            .build();
428
429        let result = sync.sync_operation(&op, "session-001", "agent-001").await;
430        assert!(result.is_ok());
431    }
432}