aicx 0.6.6

Operator CLI + MCP server: canonical corpus first, optional semantic index second (Claude Code, Codex, Gemini)
Documentation
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio};
use std::sync::{
    Arc, Mutex,
    atomic::{AtomicBool, Ordering},
    mpsc,
};
use std::thread;
use std::time::Duration;

#[derive(Debug)]
enum StoreEvent {
    Line(String),
    Done(StoreOutcome),
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum StoreOutcome {
    Completed,
    Failed,
    Cancelled,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct StoreProgress {
    pub phase: String,
    pub current: u64,
    pub total: Option<u64>,
    pub status: String,
}

impl StoreProgress {
    pub fn ratio(&self) -> f64 {
        let Some(total) = self.total else {
            return 0.0;
        };
        if total == 0 {
            0.0
        } else {
            (self.current as f64 / total as f64).clamp(0.0, 1.0)
        }
    }
}

#[derive(Debug)]
pub struct StoreScreen {
    pub running: bool,
    pub log: Vec<String>,
    pub scroll: usize,
    pub status: String,
    pub hours: u64,
    pub progress: Option<StoreProgress>,
    rx: Option<mpsc::Receiver<StoreEvent>>,
    child: Option<Arc<Mutex<Option<Child>>>>,
    cancel_requested: Option<Arc<AtomicBool>>,
}

impl Default for StoreScreen {
    fn default() -> Self {
        Self {
            running: false,
            log: Vec::new(),
            scroll: 0,
            status: "store range: 48h".to_string(),
            hours: 48,
            progress: None,
            rx: None,
            child: None,
            cancel_requested: None,
        }
    }
}

impl StoreScreen {
    pub fn is_running(&self) -> bool {
        self.running
    }

    pub fn start(&mut self) {
        if self.running {
            self.status = "store run already in flight".to_string();
            return;
        }

        if self.hours == 0 {
            self.hours = 48;
        }

        let Ok(exe) = std::env::current_exe() else {
            self.status = "failed to resolve current aicx executable".to_string();
            return;
        };

        let (event_tx, event_rx) = mpsc::channel();
        self.rx = Some(event_rx);
        let cancel_requested = Arc::new(AtomicBool::new(false));
        self.cancel_requested = Some(cancel_requested.clone());
        self.running = true;
        self.log.clear();
        self.progress = None;
        self.log
            .push(format!("running: aicx store -H {} --emit none", self.hours));
        self.status = "store run started".to_string();

        let hours = self.hours.to_string();
        let child_slot: Arc<Mutex<Option<Child>>> = Arc::new(Mutex::new(None));
        self.child = Some(child_slot.clone());

        thread::spawn(move || {
            let mut child = match Command::new(exe)
                .arg("store")
                .arg("-H")
                .arg(hours)
                .arg("--emit")
                .arg("none")
                .stdout(Stdio::piped())
                .stderr(Stdio::piped())
                .spawn()
            {
                Ok(child) => child,
                Err(error) => {
                    let _ = event_tx.send(StoreEvent::Line(format!("spawn failed: {error}")));
                    let _ = event_tx.send(StoreEvent::Done(StoreOutcome::Failed));
                    return;
                }
            };

            if let Some(stderr) = child.stderr.take() {
                let tx = event_tx.clone();
                thread::spawn(move || {
                    for line in BufReader::new(stderr).lines().map_while(Result::ok) {
                        let _ = tx.send(StoreEvent::Line(line));
                    }
                });
            }

            if let Some(stdout) = child.stdout.take() {
                let tx = event_tx.clone();
                thread::spawn(move || {
                    for line in BufReader::new(stdout).lines().map_while(Result::ok) {
                        let _ = tx.send(StoreEvent::Line(line));
                    }
                });
            }

            {
                let mut guard = child_slot
                    .lock()
                    .unwrap_or_else(|poisoned| poisoned.into_inner());
                *guard = Some(child);
            }

            loop {
                let outcome = {
                    let mut guard = child_slot
                        .lock()
                        .unwrap_or_else(|poisoned| poisoned.into_inner());
                    if let Some(child) = guard.as_mut() {
                        match child.try_wait() {
                            Ok(Some(status)) => {
                                *guard = None;
                                if cancel_requested.load(Ordering::SeqCst) {
                                    StoreOutcome::Cancelled
                                } else if status.success() {
                                    StoreOutcome::Completed
                                } else {
                                    StoreOutcome::Failed
                                }
                            }
                            Ok(None) => {
                                drop(guard);
                                thread::sleep(Duration::from_millis(100));
                                continue;
                            }
                            Err(error) => {
                                let _ = event_tx
                                    .send(StoreEvent::Line(format!("wait failed: {error}")));
                                *guard = None;
                                StoreOutcome::Failed
                            }
                        }
                    } else {
                        StoreOutcome::Cancelled
                    }
                };
                let _ = event_tx.send(StoreEvent::Done(outcome));
                break;
            }
        });
    }

    pub fn poll(&mut self) {
        while let Some(event) = self.rx.as_ref().and_then(|rx| rx.try_recv().ok()) {
            match event {
                StoreEvent::Line(line) => {
                    self.update_progress_from_line(&line);
                    self.push_log(line);
                }
                StoreEvent::Done(outcome) => {
                    self.running = false;
                    self.child = None;
                    self.cancel_requested = None;
                    self.status = match outcome {
                        StoreOutcome::Completed => "store run completed".to_string(),
                        StoreOutcome::Failed => "store run failed".to_string(),
                        StoreOutcome::Cancelled => "store run cancelled".to_string(),
                    };
                }
            }
        }
    }

    pub fn cancel(&mut self) -> bool {
        if !self.running {
            return false;
        }

        if let Some(cancel_requested) = &self.cancel_requested {
            cancel_requested.store(true, Ordering::SeqCst);
        }

        let mut killed = false;
        if let Some(child_slot) = &self.child {
            let mut guard = child_slot
                .lock()
                .unwrap_or_else(|poisoned| poisoned.into_inner());
            if let Some(child) = guard.as_mut() {
                killed = child.kill().is_ok();
            }
        }

        if killed {
            self.status = "store cancel requested; kill signal sent".to_string();
            self.push_log("cancel requested; kill signal sent to store subprocess".to_string());
        } else {
            self.status = "store cancel requested; subprocess already exiting".to_string();
            self.push_log("cancel requested; subprocess already exiting".to_string());
        }
        true
    }

    pub fn cycle_hours(&mut self) {
        const PRESETS: &[u64] = &[4, 24, 48, 168];
        if self.running {
            self.status = "store range is locked while a run is active".to_string();
            return;
        }
        let current = if self.hours == 0 { 48 } else { self.hours };
        let next = PRESETS
            .iter()
            .position(|preset| *preset == current)
            .map(|idx| PRESETS[(idx + 1) % PRESETS.len()])
            .unwrap_or(48);
        self.hours = next;
        self.status = format!("store range set to {next}h");
    }

    pub fn move_log(&mut self, delta: isize) {
        if delta < 0 {
            self.scroll = self.scroll.saturating_sub(delta.unsigned_abs());
        } else {
            self.scroll = self
                .scroll
                .saturating_add(delta as usize)
                .min(self.log.len().saturating_sub(1));
        }
    }

    fn push_log(&mut self, line: String) {
        self.log.push(line);
        self.scroll = self.scroll.min(self.log.len().saturating_sub(1));
    }

    fn update_progress_from_line(&mut self, line: &str) {
        let Some(mut progress) = parse_progress_line(line) else {
            return;
        };
        if progress.total.is_none()
            && let Some(previous) = &self.progress
            && previous.phase == progress.phase
        {
            progress.total = previous.total;
            if matches!(progress.status.as_str(), "ok" | "failed")
                && let Some(total) = progress.total
            {
                progress.current = total;
            }
        }
        self.status = match progress.total {
            Some(total) if total > 0 => format!(
                "{} {} {}/{}",
                progress.phase, progress.status, progress.current, total
            ),
            _ => format!("{} {}", progress.phase, progress.status),
        };
        self.progress = Some(progress);
    }
}

fn parse_progress_line(line: &str) -> Option<StoreProgress> {
    let inner = line.strip_prefix("[aicx][")?.strip_suffix(']')?;
    let mut phase = None;
    let mut event = None;
    let mut status = None;
    let mut current = None;
    let mut total = None;

    for part in inner.split_whitespace() {
        let Some((key, value)) = part.split_once('=') else {
            continue;
        };
        match key {
            "phase" => phase = Some(value.to_string()),
            "event" => event = Some(value.to_string()),
            "status" => status = Some(value.trim_matches('"').to_string()),
            "current" => current = value.parse::<u64>().ok(),
            "total" => total = value.parse::<u64>().ok(),
            _ => {}
        }
    }

    let event = event?;
    let current = match event.as_str() {
        "finish" => total.unwrap_or(current.unwrap_or_default()),
        _ => current.unwrap_or_default(),
    };
    Some(StoreProgress {
        phase: phase?,
        current,
        total,
        status: status.unwrap_or(event),
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parses_structured_progress_tick() {
        let parsed =
            parse_progress_line("[aicx][phase=chunk event=tick elapsed_ms=42 current=7 total=10]")
                .expect("progress");
        assert_eq!(parsed.phase, "chunk");
        assert_eq!(parsed.current, 7);
        assert_eq!(parsed.total, Some(10));
        assert_eq!(parsed.status, "tick");
        assert!((parsed.ratio() - 0.7).abs() < f64::EPSILON);
    }

    #[test]
    fn parses_structured_progress_finish() {
        let parsed = parse_progress_line(
            "[aicx][phase=steer_sync event=finish status=ok elapsed_ms=9 summary=\"12 docs\"]",
        )
        .expect("progress");
        assert_eq!(parsed.phase, "steer_sync");
        assert_eq!(parsed.current, 0);
        assert_eq!(parsed.total, None);
        assert_eq!(parsed.status, "ok");
    }

    #[test]
    fn ignores_plain_log_lines() {
        assert!(parse_progress_line("  [codex] 12 entries").is_none());
    }
}