use crate::engine::{Engine, Task};
use crate::event::Event;
use crate::memory::{Fact, Memory};
use std::sync::Arc;
use tokio::sync::mpsc;
pub struct Distiller;
impl Distiller {
pub async fn distill(memory: &Arc<dyn Memory>, events: &[Event], _task_description: &str) {
let mut facts = Vec::new();
let mut lang_hints = Vec::new();
let mut framework_hints = Vec::new();
let mut style_hints = Vec::new();
for event in events {
match event {
Event::ToolUseProposed { args, .. } => {
if let Some(path) = args.get("path").and_then(|v| v.as_str()) {
if path.ends_with(".rs") {
lang_hints.push("Rust".to_string());
}
if path.ends_with(".ts") || path.ends_with(".tsx") {
lang_hints.push("TypeScript".to_string());
}
if path.ends_with(".py") {
lang_hints.push("Python".to_string());
}
if path.ends_with(".go") {
lang_hints.push("Go".to_string());
}
if path.ends_with(".js") || path.ends_with(".jsx") {
lang_hints.push("JavaScript".to_string());
}
}
if let Some(content) = args.get("content").and_then(|v| v.as_str()) {
if content.contains("Cargo.toml") {
framework_hints.push("Rust/Cargo".to_string());
}
if content.contains("package.json") {
framework_hints.push("Node.js".to_string());
}
if content.contains("go.mod") {
framework_hints.push("Go modules".to_string());
}
}
}
Event::ThinkingDelta { text, .. } => {
if text.contains("refactor") {
style_hints.push("prefers refactoring".to_string());
}
if text.contains("test") || text.contains("TDD") {
style_hints.push("test-driven".to_string());
}
}
_ => {}
}
}
lang_hints.sort();
lang_hints.dedup();
framework_hints.sort();
framework_hints.dedup();
style_hints.sort();
style_hints.dedup();
for lang in &lang_hints {
facts.push(Fact {
id: uuid::Uuid::new_v4().to_string(),
key: "user:language".into(),
value: lang.clone(),
created_at: chrono::Utc::now().format("%Y-%m-%d").to_string(),
updated_at: chrono::Utc::now().format("%Y-%m-%d").to_string(),
});
}
for fw in &framework_hints {
facts.push(Fact {
id: uuid::Uuid::new_v4().to_string(),
key: "user:framework".into(),
value: fw.clone(),
created_at: chrono::Utc::now().format("%Y-%m-%d").to_string(),
updated_at: chrono::Utc::now().format("%Y-%m-%d").to_string(),
});
}
for style in &style_hints {
facts.push(Fact {
id: uuid::Uuid::new_v4().to_string(),
key: "user:style".into(),
value: style.clone(),
created_at: chrono::Utc::now().format("%Y-%m-%d").to_string(),
updated_at: chrono::Utc::now().format("%Y-%m-%d").to_string(),
});
}
let existing = memory.all_facts();
let existing_keys: Vec<&str> = existing.iter().map(|f| f.key.as_str()).collect();
for fact in facts {
if !existing_keys.contains(&fact.key.as_str()) {
let _ = memory.remember(fact);
}
}
if !lang_hints.is_empty() || !framework_hints.is_empty() {
tracing::info!(
"Distiller: extracted {} facts from session",
lang_hints.len() + framework_hints.len() + style_hints.len()
);
}
}
}
#[derive(Debug, Clone)]
pub struct Embeddings {
pub vectors: Vec<(String, Vec<f64>)>,
dimensions: usize,
}
impl Embeddings {
pub const DEFAULT_DIMENSIONS: usize = 512;
pub fn new() -> Self {
Self {
vectors: Vec::new(),
dimensions: Self::DEFAULT_DIMENSIONS,
}
}
pub fn with_dimensions(dimensions: usize) -> Self {
Self {
vectors: Vec::new(),
dimensions: dimensions.max(16),
}
}
pub fn embed(&self, text: &str) -> Vec<f64> {
embed_with_dimensions(text, self.dimensions)
}
pub fn add(&mut self, text: &str) {
let clean = text.trim();
if clean.is_empty() {
return;
}
self.vectors.push((clean.to_string(), self.embed(clean)));
}
pub fn add_many<I, S>(&mut self, texts: I)
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
for text in texts {
self.add(text.as_ref());
}
}
pub fn search(&self, query: &str, k: usize) -> Vec<String> {
self.search_scored(query, k)
.into_iter()
.map(|(_, text)| text)
.collect()
}
pub fn search_scored(&self, query: &str, k: usize) -> Vec<(f64, String)> {
if k == 0 {
return Vec::new();
}
let q_embed = self.embed(query);
let mut scored: Vec<(f64, usize, &str)> = self
.vectors
.iter()
.enumerate()
.map(|(idx, (text, emb))| (cosine_sim(&q_embed, emb), idx, text.as_str()))
.collect();
scored.sort_by(|a, b| {
b.0.partial_cmp(&a.0)
.unwrap_or(std::cmp::Ordering::Equal)
.then(a.1.cmp(&b.1))
});
scored
.into_iter()
.take(k)
.filter(|(score, _, _)| *score > 0.0)
.map(|(score, _, text)| (score, text.to_string()))
.collect()
}
pub fn save_to_path(&self, path: impl AsRef<std::path::Path>) -> anyhow::Result<()> {
let snapshot = EmbeddingsSnapshot {
dimensions: self.dimensions,
texts: self.vectors.iter().map(|(text, _)| text.clone()).collect(),
};
let json = serde_json::to_string_pretty(&snapshot)?;
if let Some(parent) = path.as_ref().parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(path, json)?;
Ok(())
}
pub fn load_from_path(path: impl AsRef<std::path::Path>) -> anyhow::Result<Self> {
let json = std::fs::read_to_string(path)?;
let snapshot: EmbeddingsSnapshot = serde_json::from_str(&json)?;
let mut index = Self::with_dimensions(snapshot.dimensions);
index.add_many(snapshot.texts);
Ok(index)
}
}
impl Default for Embeddings {
fn default() -> Self {
Self::new()
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct EmbeddingsSnapshot {
dimensions: usize,
texts: Vec<String>,
}
fn embed_with_dimensions(text: &str, dimensions: usize) -> Vec<f64> {
let mut vector = vec![0.0; dimensions.max(16)];
let tokens = tokenize(text);
for token in &tokens {
add_feature(&mut vector, token, 1.0);
}
for pair in tokens.windows(2) {
add_feature(&mut vector, &format!("{}__{}", pair[0], pair[1]), 1.35);
}
for value in &mut vector {
if *value != 0.0 {
*value = value.signum() * value.abs().ln_1p();
}
}
normalize(&mut vector);
vector
}
fn tokenize(text: &str) -> Vec<String> {
let mut tokens = Vec::new();
let mut current = String::new();
for ch in text.chars() {
if ch.is_alphanumeric() {
current.extend(ch.to_lowercase());
} else if !current.is_empty() {
tokens.push(std::mem::take(&mut current));
}
}
if !current.is_empty() {
tokens.push(current);
}
tokens
}
fn add_feature(vector: &mut [f64], feature: &str, weight: f64) {
let hash = fnv1a64(feature.as_bytes());
let idx = (hash as usize) % vector.len();
let sign = if hash & (1 << 63) == 0 { 1.0 } else { -1.0 };
vector[idx] += sign * weight;
}
fn fnv1a64(bytes: &[u8]) -> u64 {
let mut hash = 0xcbf29ce484222325u64;
for byte in bytes {
hash ^= *byte as u64;
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}
fn normalize(vector: &mut [f64]) {
let norm = vector.iter().map(|v| v * v).sum::<f64>().sqrt();
if norm > 0.0 {
for value in vector {
*value /= norm;
}
}
}
fn cosine_sim(a: &[f64], b: &[f64]) -> f64 {
let len = a.len().min(b.len());
if len == 0 {
return 0.0;
}
let dot: f64 = a.iter().zip(b.iter()).take(len).map(|(x, y)| x * y).sum();
let norm_a: f64 = a.iter().take(len).map(|x| x * x).sum::<f64>().sqrt();
let norm_b: f64 = b.iter().take(len).map(|x| x * x).sum::<f64>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
0.0
} else {
dot / (norm_a * norm_b)
}
}
pub struct ReExecuter {
engine: Arc<Engine>,
}
impl ReExecuter {
pub fn new(engine: Arc<Engine>) -> Self {
Self { engine }
}
pub async fn re_execute(
&self,
transcript: &crate::runtime::recorder::Transcript,
) -> anyhow::Result<crate::event::OutcomeSummary> {
let (tx, _rx) = mpsc::unbounded_channel::<Event>();
let task = Task {
description: transcript.inputs.task.clone(),
context: vec![],
};
self.engine.drive(task, tx).await
}
}
pub struct OAuthFlow;
impl OAuthFlow {
pub async fn start_device_flow(
device_endpoint: &str,
token_endpoint_hint: &str, client_id: &str,
scope: &str,
) -> anyhow::Result<(String, String, String)> {
let _ = token_endpoint_hint;
let client = reqwest::Client::new();
let resp: serde_json::Value = client
.post(device_endpoint)
.form(&[("client_id", client_id), ("scope", scope)])
.send()
.await?
.json()
.await?;
let verification_uri = resp["verification_uri"]
.as_str()
.or_else(|| resp["verification_url"].as_str())
.unwrap_or("")
.to_string();
let user_code = resp["user_code"].as_str().unwrap_or("").to_string();
let device_code = resp["device_code"].as_str().unwrap_or("").to_string();
if device_code.is_empty() {
anyhow::bail!("Device flow start failed — provider response: {}", resp);
}
Ok((verification_uri, user_code, device_code))
}
pub async fn poll_token(
token_endpoint: &str,
client_id: &str,
device_code: &str,
timeout_secs: u64,
) -> anyhow::Result<String> {
let client = reqwest::Client::new();
let start = std::time::Instant::now();
loop {
if start.elapsed().as_secs() > timeout_secs {
anyhow::bail!("OAuth timed out after {}s", timeout_secs);
}
let resp: serde_json::Value = client
.post(token_endpoint)
.form(&[
("client_id", client_id),
("device_code", device_code),
("grant_type", "urn:ietf:params:oauth:grant-type:device_code"),
])
.send()
.await?
.json()
.await?;
if let Some(token) = resp["access_token"].as_str() {
return Ok(token.to_string());
}
match resp["error"].as_str() {
Some("authorization_pending") | Some("slow_down") => {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
continue;
}
Some(e) => anyhow::bail!("OAuth error: {}", e),
None => {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
}
}
}
}
pub const IBM_PLEX_MONO_URL: &str =
"https://github.com/IBM/plex/releases/latest/download/IBM-Plex-Mono.zip";
pub fn ibm_plex_install_instructions() -> String {
r#"IBM Plex Mono — recommended font for Sparrow TUI.
Install:
Linux: sudo apt install fonts-ibm-plex
macOS: brew install font-ibm-plex
Windows: Download from https://github.com/IBM/plex/releases
Then update your terminal to use "IBM Plex Mono" as the font.
"#
.to_string()
}
pub struct ChatSession {
engine: Arc<Engine>,
history: Vec<crate::provider::Msg>,
running: bool,
}
impl ChatSession {
pub fn new(engine: Arc<Engine>) -> Self {
Self {
engine,
history: Vec::new(),
running: true,
}
}
pub async fn run_interactive(&mut self) -> anyhow::Result<()> {
use std::io::{self, Write};
println!("═══ Sparrow Chat ═══");
println!("Type your message and press Enter. Type /exit to quit.");
println!();
while self.running {
print!("◆ you › ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input = input.trim().to_string();
if input.is_empty() {
continue;
}
if input == "/exit" || input == "/quit" {
break;
}
self.history.push(crate::provider::Msg {
role: "user".into(),
content: vec![crate::provider::ContentBlock::Text {
text: input.clone(),
}],
});
let (tx, mut rx) = mpsc::unbounded_channel::<Event>();
let task = Task {
description: input.clone(),
context: self.history.clone(),
};
let engine = self.engine.clone();
let handle = tokio::spawn(async move { engine.drive(task, tx).await });
while let Some(event) = rx.recv().await {
match &event {
Event::ThinkingDelta { text, .. } => {
print!("{}", text);
io::stdout().flush()?;
}
Event::RunFinished { outcome, .. } => {
println!("\n── {} | ${:.4} ──", outcome.status, outcome.cost_usd);
}
Event::Error { message, .. } => {
eprintln!("\nError: {}", message);
}
_ => {}
}
}
match handle.await? {
Ok(outcome) => {
self.history.push(crate::provider::Msg {
role: "assistant".into(),
content: vec![crate::provider::ContentBlock::Text {
text: format!("[{}]", outcome.status),
}],
});
}
Err(e) => {
eprintln!("Chat error: {}", e);
}
}
println!();
}
Ok(())
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PipelineConfig {
pub name: String,
pub steps: Vec<PipelineStep>,
pub max_reworks: u32,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PipelineStep {
pub role: String,
pub model_preference: Option<String>,
pub prompt_override: Option<String>,
pub depends_on: Vec<String>,
}
impl PipelineConfig {
pub fn default_pipeline() -> Self {
Self {
name: "planner-coder-verifier".into(),
steps: vec![
PipelineStep {
role: "planner".into(),
model_preference: None,
prompt_override: None,
depends_on: vec![],
},
PipelineStep {
role: "coder".into(),
model_preference: None,
prompt_override: None,
depends_on: vec!["planner".into()],
},
PipelineStep {
role: "verifier".into(),
model_preference: None,
prompt_override: None,
depends_on: vec!["coder".into()],
},
],
max_reworks: 3,
}
}
pub fn validate(&self) -> anyhow::Result<()> {
if self.steps.is_empty() {
anyhow::bail!("Pipeline must have at least one step");
}
for step in &self.steps {
for dep in &step.depends_on {
if !self.steps.iter().any(|s| s.role == *dep) {
anyhow::bail!("Step '{}' depends on unknown role '{}'", step.role, dep);
}
}
}
Ok(())
}
pub fn from_toml(content: &str) -> anyhow::Result<Self> {
Ok(toml::from_str(content)?)
}
pub fn to_toml(&self) -> anyhow::Result<String> {
Ok(toml::to_string_pretty(self)?)
}
}
pub struct Profile {
pub name: String,
pub config_dir: std::path::PathBuf,
pub state_dir: std::path::PathBuf,
pub config: crate::config::Config,
pub memory: Arc<dyn Memory>,
}
impl Profile {
pub fn load(name: &str) -> anyhow::Result<Self> {
let base_config = dirs::config_dir().unwrap_or_default().join("sparrow");
let base_state = dirs::state_dir().unwrap_or_default().join("sparrow");
let config_dir = base_config.join("profiles").join(name);
let state_dir = base_state.join("profiles").join(name);
std::fs::create_dir_all(&config_dir)?;
std::fs::create_dir_all(&state_dir)?;
let config = if config_dir.join("config.toml").exists() {
let content = std::fs::read_to_string(config_dir.join("config.toml"))?;
toml::from_str(&content)?
} else {
let default = base_config.join("config.toml");
if default.exists() {
let content = std::fs::read_to_string(&default)?;
toml::from_str(&content)?
} else {
crate::config::Config {
defaults: Default::default(),
routing: Default::default(),
budget: Default::default(),
providers: Default::default(),
surfaces: Default::default(),
skills: Default::default(),
permissions: Default::default(),
hooks: Default::default(),
theme: "captain".into(),
config_dir: config_dir.clone(),
state_dir: state_dir.clone(),
forced_model: None,
}
}
};
let memory: Arc<dyn Memory> = Arc::new(crate::memory::SqliteMemory::open(
&state_dir.join("profile.db"),
)?);
Ok(Self {
name: name.to_string(),
config_dir,
state_dir,
config,
memory,
})
}
pub fn create(name: &str) -> anyhow::Result<()> {
let base_config = dirs::config_dir().unwrap_or_default().join("sparrow");
let config_dir = base_config.join("profiles").join(name);
std::fs::create_dir_all(&config_dir)?;
let default = base_config.join("config.toml");
if default.exists() {
std::fs::copy(&default, config_dir.join("config.toml"))?;
}
let base_state = dirs::state_dir().unwrap_or_default().join("sparrow");
std::fs::create_dir_all(base_state.join("profiles").join(name))?;
Ok(())
}
pub fn list() -> Vec<String> {
let base_config = dirs::config_dir().unwrap_or_default().join("sparrow");
let profiles_dir = base_config.join("profiles");
let mut names = Vec::new();
if let Ok(entries) = std::fs::read_dir(&profiles_dir) {
for entry in entries.flatten() {
if entry.path().is_dir() {
if let Some(name) = entry.file_name().to_str() {
names.push(name.to_string());
}
}
}
}
names.sort();
names
}
}