collet 0.1.1

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! Shared knowledge base for inter-agent communication.
//!
//! Thread-safe via `Arc<RwLock>` — reads vastly outnumber writes so multiple
//! agents can query concurrently without blocking.
//!
//! Knowledge retrieval uses the shared BM25 engine (`crate::search`) for
//! relevance-ranked filtering.  Instead of dumping all facts into every agent,
//! `build_relevant_summary` scores each knowledge item against the agent's
//! subtask prompt + target files and injects only the top-K most relevant items.
//!
//! ## Blackboard (Flock mode)
//!
//! `post_to_blackboard` / `read_blackboard` provide a typed key-value store
//! for decentralized inter-agent coordination (Blackboard architecture pattern).
//! Agents post proposals, vote on them, and read current state autonomously.
//!
//! ## Persistence
//!
//! `snapshot` / `restore` serialize the knowledge index to JSON so it survives
//! session restarts.

pub mod blackboard;
pub mod checkpoint;
pub mod inner;
pub mod persistence;
pub mod summary;
pub mod types;
pub mod worker;

#[cfg(test)]
mod tests;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;

pub use types::{
    Announcement, BlackboardKind, CheckpointResult, CheckpointTask, ExecutionCheckpoint,
    FileModification, FileSummary, KnowledgeFact, ModificationType, WorkerHandles,
    WorkerInstruction,
};

pub use checkpoint::{clear_checkpoint, load_checkpoint, save_checkpoint};

use inner::{KnowledgeInner, prepare_index_entry};
use types::EntryKind;

#[derive(Clone)]
pub struct SharedKnowledge {
    inner: Arc<RwLock<KnowledgeInner>>,
    /// Per-worker control handles for external cancel/extend.
    worker_handles: Arc<RwLock<HashMap<String, WorkerHandles>>>,
    /// Notified when any worker is resumed from paused state.
    /// Allows `wait_for_any` to sleep instead of polling.
    resume_notify: Arc<tokio::sync::Notify>,
}

impl Default for SharedKnowledge {
    fn default() -> Self {
        Self {
            inner: Arc::new(RwLock::new(KnowledgeInner::default())),
            worker_handles: Arc::new(RwLock::new(HashMap::new())),
            resume_notify: Arc::new(tokio::sync::Notify::new()),
        }
    }
}

impl SharedKnowledge {
    pub fn new() -> Self {
        Self::default()
    }

    // ── Facts ────────────────────────────────────────────────────────

    /// Post a discovered fact. Returns the fact ID.
    /// Automatically indexes for BM25 retrieval.
    pub async fn post_fact(&self, agent_id: &str, topic: &str, content: &str) -> u64 {
        // Expensive tokenization happens outside the write lock.
        let index_text = format!("{topic} {content}");
        let display = format!("- [{}] {}: {}", agent_id, topic, content);
        let paths = if topic.starts_with("file:") {
            vec![topic.trim_start_matches("file:").to_string()]
        } else {
            Vec::new()
        };
        let prepared = prepare_index_entry(EntryKind::Fact, &index_text, display, paths);

        let mut inner = self.inner.write().await;
        let id = inner.insert_prepared_entry(prepared);

        inner.facts.insert(
            topic.to_string(),
            KnowledgeFact {
                id,
                agent_id: agent_id.to_string(),
                topic: topic.to_string(),
                content: content.to_string(),
                timestamp: Instant::now(),
            },
        );
        id
    }

    /// Query facts by topic prefix.
    pub async fn query_facts(&self, topic_prefix: &str) -> Vec<KnowledgeFact> {
        let inner = self.inner.read().await;
        inner
            .facts
            .iter()
            .filter(|(k, _)| k.starts_with(topic_prefix))
            .map(|(_, v)| v.clone())
            .collect()
    }

    // ── File I/O Tracking ────────────────────────────────────────────

    /// Record that an agent read a file.
    pub async fn record_file_read(
        &self,
        agent_id: &str,
        path: &str,
        summary: &str,
        line_count: usize,
    ) {
        // Expensive tokenization happens outside the write lock.
        let index_text = format!("{path} {summary}");
        let display = format!("- `{path}` ({line_count} lines): {summary}");
        let paths = vec![path.to_string()];
        let prepared = prepare_index_entry(EntryKind::FileRead, &index_text, display, paths);

        let mut inner = self.inner.write().await;
        inner.insert_prepared_entry(prepared);

        // Track actual file access
        inner
            .actual_file_access
            .entry(agent_id.to_string())
            .or_default()
            .insert(path.to_string());

        inner.files_read.insert(
            path.to_string(),
            FileSummary {
                path: path.to_string(),
                agent_id: agent_id.to_string(),
                summary: summary.to_string(),
                line_count,
            },
        );
    }

    /// Check if any agent has already read a file.
    pub async fn file_already_read(&self, path: &str) -> Option<FileSummary> {
        let inner = self.inner.read().await;
        inner.files_read.get(path).cloned()
    }

    /// Record a file modification (for conflict detection).
    ///
    /// `content_snapshot` stores a truncated copy of the file after modification,
    /// enabling `CoordinatorResolves` to compare competing versions.
    /// Pass `None` for lightweight recording (e.g. tests).
    pub async fn record_file_modification(
        &self,
        agent_id: &str,
        path: &str,
        mod_type: ModificationType,
        content_hash: u64,
        content_snapshot: Option<String>,
    ) {
        let mut inner = self.inner.write().await;

        // Track actual file access
        inner
            .actual_file_access
            .entry(agent_id.to_string())
            .or_default()
            .insert(path.to_string());

        inner
            .files_modified
            .entry(path.to_string())
            .or_default()
            .push(FileModification {
                agent_id: agent_id.to_string(),
                modification_type: mod_type,
                content_hash,
                timestamp: Instant::now(),
                content_snapshot,
            });
    }

    /// Get all modifications for a file.
    pub async fn file_modifications(&self, path: &str) -> Vec<FileModification> {
        let inner = self.inner.read().await;
        inner.files_modified.get(path).cloned().unwrap_or_default()
    }

    /// Record that an agent accessed a file at runtime (read or write).
    pub async fn track_file_access(&self, agent_id: &str, path: &str) {
        let mut inner = self.inner.write().await;
        inner
            .actual_file_access
            .entry(agent_id.to_string())
            .or_default()
            .insert(path.to_string());
    }

    /// Get actual files accessed by an agent (vs predicted target_files).
    pub async fn actual_files_for_agent(&self, agent_id: &str) -> Vec<String> {
        let inner = self.inner.read().await;
        inner
            .actual_file_access
            .get(agent_id)
            .map(|s| s.iter().cloned().collect())
            .unwrap_or_default()
    }

    /// Compare predicted target_files vs actual file access for diagnostics.
    pub async fn file_prediction_accuracy(
        &self,
        agent_id: &str,
        predicted: &[String],
    ) -> (usize, usize, usize) {
        let actual = self.actual_files_for_agent(agent_id).await;
        let actual_set: std::collections::HashSet<&str> =
            actual.iter().map(|s| s.as_str()).collect();
        let predicted_set: std::collections::HashSet<&str> =
            predicted.iter().map(|s| s.as_str()).collect();

        let correct = actual_set.intersection(&predicted_set).count();
        let missed = actual_set.difference(&predicted_set).count();
        let extra = predicted_set.difference(&actual_set).count();
        (correct, missed, extra)
    }

    // ── Announcements ────────────────────────────────────────────────

    /// Post an announcement visible to all agents.
    pub async fn announce(&self, agent_id: &str, message: &str) -> u64 {
        // Expensive tokenization happens outside the write lock.
        let display = format!("- [{}] {}", agent_id, message);
        let prepared = prepare_index_entry(EntryKind::Announcement, message, display, Vec::new());

        let mut inner = self.inner.write().await;
        let id = inner.insert_prepared_entry(prepared);

        inner.announcements.push(Announcement {
            id,
            agent_id: agent_id.to_string(),
            message: message.to_string(),
            timestamp: Instant::now(),
        });
        id
    }

    /// Get announcements since a given ID.
    pub async fn announcements_since(&self, since_id: u64) -> Vec<Announcement> {
        let inner = self.inner.read().await;
        inner
            .announcements
            .iter()
            .filter(|a| a.id > since_id)
            .cloned()
            .collect()
    }

    // ── Conflict Detection ───────────────────────────────────────────

    /// Get all files modified by multiple agents (conflict candidates).
    pub async fn conflicting_files(&self) -> Vec<(String, Vec<FileModification>)> {
        let inner = self.inner.read().await;
        inner
            .files_modified
            .iter()
            .filter(|(_, mods)| {
                let unique_agents: std::collections::HashSet<&str> =
                    mods.iter().map(|m| m.agent_id.as_str()).collect();
                unique_agents.len() > 1
            })
            .map(|(path, mods)| (path.clone(), mods.clone()))
            .collect()
    }

    /// Get all files that were read.
    pub async fn all_files_read(&self) -> Vec<String> {
        let inner = self.inner.read().await;
        inner.files_read.keys().cloned().collect()
    }

    /// Number of indexed knowledge entries.
    pub async fn index_size(&self) -> usize {
        let inner = self.inner.read().await;
        inner.index.len()
    }
}