use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::process::Command;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use crate::error::{Result, RuvLLMError};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaudeFlowBridgeConfig {
pub cli_command: String,
pub patterns_namespace: String,
pub tasks_namespace: String,
pub agents_namespace: String,
pub enable_cache: bool,
pub cache_ttl_seconds: i64,
pub timeout_ms: u64,
pub enable_hive_sync: bool,
}
impl Default for ClaudeFlowBridgeConfig {
fn default() -> Self {
Self {
cli_command: "npx @claude-flow/cli@latest".to_string(),
patterns_namespace: "patterns".to_string(),
tasks_namespace: "tasks".to_string(),
agents_namespace: "agents".to_string(),
enable_cache: true,
cache_ttl_seconds: 300, timeout_ms: 30_000, enable_hive_sync: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClaudeFlowPattern {
pub key: String,
pub value: String,
pub namespace: String,
pub tags: Vec<String>,
pub metadata: HashMap<String, serde_json::Value>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResult {
pub patterns_synced: usize,
pub tasks_synced: usize,
pub duration_ms: u64,
pub errors: Vec<String>,
pub synced_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BridgeStats {
pub stores: u64,
pub searches: u64,
pub successes: u64,
pub failures: u64,
pub cache_hits: u64,
pub syncs: u64,
}
#[derive(Debug, Default)]
struct StatsInternal {
stores: AtomicU64,
searches: AtomicU64,
successes: AtomicU64,
failures: AtomicU64,
cache_hits: AtomicU64,
syncs: AtomicU64,
}
#[derive(Debug, Clone)]
struct CachedSearch {
results: Vec<ClaudeFlowPattern>,
cached_at: DateTime<Utc>,
}
pub struct ClaudeFlowMemoryBridge {
config: ClaudeFlowBridgeConfig,
search_cache: Arc<RwLock<HashMap<String, CachedSearch>>>,
stats: StatsInternal,
last_sync: Arc<RwLock<Option<DateTime<Utc>>>>,
}
impl ClaudeFlowMemoryBridge {
pub fn new(config: ClaudeFlowBridgeConfig) -> Self {
Self {
config,
search_cache: Arc::new(RwLock::new(HashMap::new())),
stats: StatsInternal::default(),
last_sync: Arc::new(RwLock::new(None)),
}
}
pub fn store_pattern(
&self,
key: &str,
value: &str,
namespace: Option<&str>,
tags: Option<Vec<String>>,
) -> Result<()> {
self.stats.stores.fetch_add(1, Ordering::SeqCst);
let ns = namespace.unwrap_or(&self.config.patterns_namespace);
let mut args = vec![
"memory".to_string(),
"store".to_string(),
"--key".to_string(),
key.to_string(),
"--value".to_string(),
value.to_string(),
"--namespace".to_string(),
ns.to_string(),
];
if let Some(tag_list) = tags {
if !tag_list.is_empty() {
args.push("--tags".to_string());
args.push(tag_list.join(","));
}
}
self.execute_cli(&args)?;
self.stats.successes.fetch_add(1, Ordering::SeqCst);
self.invalidate_cache(ns);
Ok(())
}
pub fn search_patterns(
&self,
query: &str,
namespace: Option<&str>,
limit: Option<usize>,
) -> Result<Vec<ClaudeFlowPattern>> {
self.stats.searches.fetch_add(1, Ordering::SeqCst);
let ns = namespace.unwrap_or(&self.config.patterns_namespace);
let cache_key = format!("{}:{}:{}", ns, query, limit.unwrap_or(10));
if self.config.enable_cache {
let cache = self.search_cache.read();
if let Some(cached) = cache.get(&cache_key) {
let age = Utc::now() - cached.cached_at;
if age.num_seconds() < self.config.cache_ttl_seconds {
self.stats.cache_hits.fetch_add(1, Ordering::SeqCst);
self.stats.successes.fetch_add(1, Ordering::SeqCst);
return Ok(cached.results.clone());
}
}
}
let mut args = vec![
"memory".to_string(),
"search".to_string(),
"--query".to_string(),
query.to_string(),
"--namespace".to_string(),
ns.to_string(),
];
if let Some(lim) = limit {
args.push("--limit".to_string());
args.push(lim.to_string());
}
let output = self.execute_cli(&args)?;
let patterns = self.parse_search_results(&output, ns)?;
self.stats.successes.fetch_add(1, Ordering::SeqCst);
if self.config.enable_cache {
let mut cache = self.search_cache.write();
cache.insert(
cache_key,
CachedSearch {
results: patterns.clone(),
cached_at: Utc::now(),
},
);
}
Ok(patterns)
}
pub fn retrieve_pattern(
&self,
key: &str,
namespace: Option<&str>,
) -> Result<Option<ClaudeFlowPattern>> {
let ns = namespace.unwrap_or(&self.config.patterns_namespace);
let args = vec![
"memory".to_string(),
"retrieve".to_string(),
"--key".to_string(),
key.to_string(),
"--namespace".to_string(),
ns.to_string(),
];
let output = self.execute_cli(&args)?;
if output.trim().is_empty() || output.contains("not found") {
return Ok(None);
}
let pattern = ClaudeFlowPattern {
key: key.to_string(),
value: output.trim().to_string(),
namespace: ns.to_string(),
tags: vec![],
metadata: HashMap::new(),
created_at: Utc::now(),
};
Ok(Some(pattern))
}
pub fn delete_pattern(&self, key: &str, namespace: Option<&str>) -> Result<bool> {
let ns = namespace.unwrap_or(&self.config.patterns_namespace);
let args = vec![
"memory".to_string(),
"delete".to_string(),
"--key".to_string(),
key.to_string(),
"--namespace".to_string(),
ns.to_string(),
];
self.execute_cli(&args)?;
self.invalidate_cache(ns);
Ok(true)
}
pub fn sync_with_hive(&self) -> Result<SyncResult> {
if !self.config.enable_hive_sync {
return Ok(SyncResult {
patterns_synced: 0,
tasks_synced: 0,
duration_ms: 0,
errors: vec!["Hive sync disabled".to_string()],
synced_at: Utc::now(),
});
}
self.stats.syncs.fetch_add(1, Ordering::SeqCst);
let start = std::time::Instant::now();
let mut errors = Vec::new();
let mut patterns_synced = 0;
let mut tasks_synced = 0;
match self.execute_cli(&[
"hive-mind".to_string(),
"memory".to_string(),
"--action".to_string(),
"list".to_string(),
]) {
Ok(output) => {
patterns_synced = output.lines().count();
}
Err(e) => {
errors.push(format!("Pattern sync failed: {}", e));
}
}
match self.execute_cli(&["task".to_string(), "list".to_string()]) {
Ok(output) => {
tasks_synced = output.lines().filter(|l| !l.is_empty()).count();
}
Err(e) => {
errors.push(format!("Task sync failed: {}", e));
}
}
let duration = start.elapsed();
let now = Utc::now();
*self.last_sync.write() = Some(now);
Ok(SyncResult {
patterns_synced,
tasks_synced,
duration_ms: duration.as_millis() as u64,
errors,
synced_at: now,
})
}
pub fn get_routing_suggestion(&self, task: &str) -> Result<Option<String>> {
let args = vec![
"hooks".to_string(),
"route".to_string(),
"--task".to_string(),
task.to_string(),
];
let output = self.execute_cli(&args)?;
if output.trim().is_empty() {
return Ok(None);
}
if let Some(line) = output.lines().find(|l| l.contains("Recommended agent")) {
return Ok(Some(line.to_string()));
}
Ok(Some(output.trim().to_string()))
}
pub fn record_outcome(&self, task_id: &str, success: bool, quality: Option<f32>) -> Result<()> {
let mut args = vec![
"hooks".to_string(),
"post-task".to_string(),
"--task-id".to_string(),
task_id.to_string(),
"--success".to_string(),
success.to_string(),
];
if let Some(q) = quality {
args.push("--quality".to_string());
args.push(q.to_string());
}
self.execute_cli(&args)?;
Ok(())
}
pub fn stats(&self) -> BridgeStats {
BridgeStats {
stores: self.stats.stores.load(Ordering::SeqCst),
searches: self.stats.searches.load(Ordering::SeqCst),
successes: self.stats.successes.load(Ordering::SeqCst),
failures: self.stats.failures.load(Ordering::SeqCst),
cache_hits: self.stats.cache_hits.load(Ordering::SeqCst),
syncs: self.stats.syncs.load(Ordering::SeqCst),
}
}
pub fn last_sync(&self) -> Option<DateTime<Utc>> {
*self.last_sync.read()
}
pub fn clear_cache(&self) {
self.search_cache.write().clear();
}
fn invalidate_cache(&self, namespace: &str) {
let mut cache = self.search_cache.write();
cache.retain(|k, _| !k.starts_with(&format!("{}:", namespace)));
}
fn validate_cli_arg(arg: &str) -> Result<&str> {
const FORBIDDEN: &[char] = &[
'$', ';', '|', '&', '`', '\n', '\r', '\\', '"', '\'', '<', '>', '(', ')', '{', '}',
'[', ']', '*', '?', '!', '#',
];
if arg.chars().any(|c| FORBIDDEN.contains(&c)) {
return Err(RuvLLMError::InvalidOperation(format!(
"Invalid character in CLI argument: {}",
arg
)));
}
if arg.starts_with("--")
&& arg.len() > 2
&& !arg[2..]
.chars()
.next()
.map(|c| c.is_alphanumeric())
.unwrap_or(false)
{
return Err(RuvLLMError::InvalidOperation(
"Invalid CLI argument format".to_string(),
));
}
Ok(arg)
}
fn execute_cli(&self, args: &[String]) -> Result<String> {
let cli_parts: Vec<&str> = self.config.cli_command.split_whitespace().collect();
if cli_parts.is_empty() {
self.stats.failures.fetch_add(1, Ordering::SeqCst);
return Err(RuvLLMError::Config("Empty CLI command".to_string()));
}
for arg in args {
Self::validate_cli_arg(arg).map_err(|e| {
self.stats.failures.fetch_add(1, Ordering::SeqCst);
e
})?;
}
let program = cli_parts[0];
let mut cmd = Command::new(program);
for part in &cli_parts[1..] {
cmd.arg(part);
}
for arg in args {
cmd.arg(arg);
}
let output = cmd.output().map_err(|e| {
self.stats.failures.fetch_add(1, Ordering::SeqCst);
RuvLLMError::Io(e)
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
self.stats.failures.fetch_add(1, Ordering::SeqCst);
return Err(RuvLLMError::InvalidOperation(format!(
"CLI command failed: {}",
stderr
)));
}
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
Ok(stdout)
}
fn parse_search_results(
&self,
output: &str,
namespace: &str,
) -> Result<Vec<ClaudeFlowPattern>> {
let mut patterns = Vec::new();
if let Ok(json_results) = serde_json::from_str::<Vec<serde_json::Value>>(output) {
for item in json_results {
let pattern = ClaudeFlowPattern {
key: item
.get("key")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
value: item
.get("value")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
namespace: namespace.to_string(),
tags: item
.get("tags")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str())
.map(String::from)
.collect()
})
.unwrap_or_default(),
metadata: HashMap::new(),
created_at: Utc::now(),
};
patterns.push(pattern);
}
} else {
for line in output.lines() {
if line.trim().is_empty() {
continue;
}
if let Some(pos) = line.find(':') {
let key = line[..pos].trim().to_string();
let value = line[pos + 1..].trim().to_string();
patterns.push(ClaudeFlowPattern {
key,
value,
namespace: namespace.to_string(),
tags: vec![],
metadata: HashMap::new(),
created_at: Utc::now(),
});
}
}
}
Ok(patterns)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bridge_creation() {
let config = ClaudeFlowBridgeConfig::default();
let bridge = ClaudeFlowMemoryBridge::new(config);
assert_eq!(bridge.stats().stores, 0);
}
#[test]
fn test_config_defaults() {
let config = ClaudeFlowBridgeConfig::default();
assert_eq!(config.patterns_namespace, "patterns");
assert!(config.enable_cache);
assert!(config.enable_hive_sync);
}
#[test]
fn test_cache_invalidation() {
let config = ClaudeFlowBridgeConfig::default();
let bridge = ClaudeFlowMemoryBridge::new(config);
{
let mut cache = bridge.search_cache.write();
cache.insert(
"patterns:test:10".to_string(),
CachedSearch {
results: vec![],
cached_at: Utc::now(),
},
);
cache.insert(
"tasks:test:10".to_string(),
CachedSearch {
results: vec![],
cached_at: Utc::now(),
},
);
}
assert_eq!(bridge.search_cache.read().len(), 2);
bridge.invalidate_cache("patterns");
assert_eq!(bridge.search_cache.read().len(), 1);
assert!(bridge.search_cache.read().contains_key("tasks:test:10"));
}
#[test]
fn test_clear_cache() {
let config = ClaudeFlowBridgeConfig::default();
let bridge = ClaudeFlowMemoryBridge::new(config);
{
let mut cache = bridge.search_cache.write();
cache.insert(
"test:key".to_string(),
CachedSearch {
results: vec![],
cached_at: Utc::now(),
},
);
}
assert_eq!(bridge.search_cache.read().len(), 1);
bridge.clear_cache();
assert_eq!(bridge.search_cache.read().len(), 0);
}
#[test]
fn test_parse_search_results_json() {
let config = ClaudeFlowBridgeConfig::default();
let bridge = ClaudeFlowMemoryBridge::new(config);
let json_output = r#"[
{"key": "pattern-1", "value": "value-1", "tags": ["rust"]},
{"key": "pattern-2", "value": "value-2", "tags": []}
]"#;
let results = bridge
.parse_search_results(json_output, "patterns")
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].key, "pattern-1");
assert_eq!(results[0].tags, vec!["rust"]);
}
#[test]
fn test_parse_search_results_text() {
let config = ClaudeFlowBridgeConfig::default();
let bridge = ClaudeFlowMemoryBridge::new(config);
let text_output = "key1: value1\nkey2: value2\n";
let results = bridge
.parse_search_results(text_output, "patterns")
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].key, "key1");
assert_eq!(results[0].value, "value1");
}
#[test]
fn test_sync_result_creation() {
let result = SyncResult {
patterns_synced: 10,
tasks_synced: 5,
duration_ms: 100,
errors: vec![],
synced_at: Utc::now(),
};
assert_eq!(result.patterns_synced, 10);
assert!(result.errors.is_empty());
}
}