Skip to main content

phago_web/
state.rs

1//! Application state for the web server.
2//!
3//! Uses a dedicated thread for Colony operations since Colony contains
4//! trait objects that are not Send+Sync.
5
6use anyhow::Result;
7use phago_runtime::colony::{Colony, ColonyConfig, ColonyEvent, ColonySnapshot, ColonyStats};
8use phago_core::types::Position;
9use std::sync::mpsc;
10use std::thread;
11use tokio::sync::{broadcast, oneshot};
12
13/// Commands sent to the colony worker thread.
14enum ColonyCommand {
15    GetStats(oneshot::Sender<ColonyStats>),
16    GetSnapshot(oneshot::Sender<ColonySnapshot>),
17    RunTicks(u64, oneshot::Sender<Vec<Vec<ColonyEvent>>>),
18    Ingest {
19        title: String,
20        content: String,
21        position: Position,
22        ticks: u64,
23        response: oneshot::Sender<IngestResult>,
24    },
25    Query {
26        query: String,
27        max_results: usize,
28        alpha: f64,
29        response: oneshot::Sender<QueryResult>,
30    },
31}
32
33/// Result of an ingest operation.
34pub struct IngestResult {
35    pub document_id: String,
36    pub nodes_created: usize,
37    pub edges_created: usize,
38    pub tick: u64,
39}
40
41/// Result of a query operation.
42pub struct QueryResult {
43    pub results: Vec<QueryHit>,
44    pub total_nodes: usize,
45    pub total_edges: usize,
46}
47
48pub struct QueryHit {
49    pub label: String,
50    pub score: f64,
51    pub tfidf_score: f64,
52    pub graph_score: f64,
53}
54
55/// Shared application state.
56#[derive(Clone)]
57pub struct AppState {
58    /// Channel to send commands to the colony worker.
59    cmd_tx: mpsc::Sender<ColonyCommand>,
60    /// Broadcast channel for colony events.
61    pub event_tx: broadcast::Sender<ColonyEvent>,
62}
63
64impl AppState {
65    /// Create a new app state, optionally with SQLite persistence.
66    pub fn new(_db_path: Option<String>) -> Result<Self> {
67        let (cmd_tx, cmd_rx) = mpsc::channel();
68        let (event_tx, _) = broadcast::channel(1000);
69        let event_tx_clone = event_tx.clone();
70
71        // Spawn dedicated thread for Colony operations
72        thread::spawn(move || {
73            let mut colony = Colony::from_config(ColonyConfig::default());
74
75            while let Ok(cmd) = cmd_rx.recv() {
76                match cmd {
77                    ColonyCommand::GetStats(response) => {
78                        let _ = response.send(colony.stats());
79                    }
80                    ColonyCommand::GetSnapshot(response) => {
81                        let _ = response.send(colony.snapshot());
82                    }
83                    ColonyCommand::RunTicks(ticks, response) => {
84                        let all_events = colony.run(ticks);
85                        // Broadcast events
86                        for events in &all_events {
87                            for event in events {
88                                let _ = event_tx_clone.send(event.clone());
89                            }
90                        }
91                        let _ = response.send(all_events);
92                    }
93                    ColonyCommand::Ingest { title, content, position, ticks, response } => {
94                        use phago::prelude::Digester;
95
96                        let before_nodes = colony.stats().graph_nodes;
97                        let before_edges = colony.stats().graph_edges;
98
99                        let doc_id = colony.ingest_document(&title, &content, position);
100                        colony.spawn(Box::new(Digester::new(position).with_max_idle(30)));
101
102                        let all_events = colony.run(ticks);
103                        for events in &all_events {
104                            for event in events {
105                                let _ = event_tx_clone.send(event.clone());
106                            }
107                        }
108
109                        let after_nodes = colony.stats().graph_nodes;
110                        let after_edges = colony.stats().graph_edges;
111
112                        let _ = response.send(IngestResult {
113                            document_id: format!("{}", doc_id.0),
114                            nodes_created: after_nodes.saturating_sub(before_nodes),
115                            edges_created: after_edges.saturating_sub(before_edges),
116                            tick: colony.stats().tick,
117                        });
118                    }
119                    ColonyCommand::Query { query, max_results, alpha, response } => {
120                        use phago::rag::{hybrid_query, HybridConfig};
121
122                        let config = HybridConfig {
123                            alpha,
124                            max_results,
125                            candidate_multiplier: 3,
126                        };
127                        let results = hybrid_query(&colony, &query, &config);
128                        let stats = colony.stats();
129
130                        let _ = response.send(QueryResult {
131                            results: results
132                                .into_iter()
133                                .map(|r| QueryHit {
134                                    label: r.label,
135                                    score: r.final_score,
136                                    tfidf_score: r.tfidf_score,
137                                    graph_score: r.graph_score,
138                                })
139                                .collect(),
140                            total_nodes: stats.graph_nodes,
141                            total_edges: stats.graph_edges,
142                        });
143                    }
144                }
145            }
146        });
147
148        Ok(Self { cmd_tx, event_tx })
149    }
150
151    /// Get colony statistics.
152    pub async fn stats(&self) -> ColonyStats {
153        let (tx, rx) = oneshot::channel();
154        let _ = self.cmd_tx.send(ColonyCommand::GetStats(tx));
155        rx.await.unwrap_or_else(|_| ColonyStats {
156            tick: 0,
157            agents_alive: 0,
158            agents_died: 0,
159            total_spawned: 0,
160            graph_nodes: 0,
161            graph_edges: 0,
162            total_signals: 0,
163            documents_total: 0,
164            documents_digested: 0,
165        })
166    }
167
168    /// Get colony snapshot.
169    pub async fn snapshot(&self) -> ColonySnapshot {
170        let (tx, rx) = oneshot::channel();
171        let _ = self.cmd_tx.send(ColonyCommand::GetSnapshot(tx));
172        rx.await.unwrap_or_else(|_| ColonySnapshot {
173            tick: 0,
174            agents: vec![],
175            nodes: vec![],
176            edges: vec![],
177            stats: ColonyStats {
178                tick: 0,
179                agents_alive: 0,
180                agents_died: 0,
181                total_spawned: 0,
182                graph_nodes: 0,
183                graph_edges: 0,
184                total_signals: 0,
185                documents_total: 0,
186                documents_digested: 0,
187            },
188        })
189    }
190
191    /// Run N ticks.
192    pub async fn run(&self, ticks: u64) -> Vec<Vec<ColonyEvent>> {
193        let (tx, rx) = oneshot::channel();
194        let _ = self.cmd_tx.send(ColonyCommand::RunTicks(ticks, tx));
195        rx.await.unwrap_or_default()
196    }
197
198    /// Ingest a document.
199    pub async fn ingest(&self, title: String, content: String, position: Position, ticks: u64) -> IngestResult {
200        let (tx, rx) = oneshot::channel();
201        let _ = self.cmd_tx.send(ColonyCommand::Ingest {
202            title,
203            content,
204            position,
205            ticks,
206            response: tx,
207        });
208        rx.await.unwrap_or_else(|_| IngestResult {
209            document_id: "error".to_string(),
210            nodes_created: 0,
211            edges_created: 0,
212            tick: 0,
213        })
214    }
215
216    /// Query the knowledge graph.
217    pub async fn query(&self, query: String, max_results: usize, alpha: f64) -> QueryResult {
218        let (tx, rx) = oneshot::channel();
219        let _ = self.cmd_tx.send(ColonyCommand::Query {
220            query,
221            max_results,
222            alpha,
223            response: tx,
224        });
225        rx.await.unwrap_or_else(|_| QueryResult {
226            results: vec![],
227            total_nodes: 0,
228            total_edges: 0,
229        })
230    }
231
232    /// Subscribe to events.
233    pub fn subscribe(&self) -> broadcast::Receiver<ColonyEvent> {
234        self.event_tx.subscribe()
235    }
236}