1use anyhow::Result;
7use phago_runtime::colony::{Colony, ColonyConfig, ColonyEvent, ColonySnapshot, ColonyStats};
8use phago_core::types::Position;
9use std::sync::mpsc;
10use std::thread;
11use tokio::sync::{broadcast, oneshot};
12
13enum ColonyCommand {
15 GetStats(oneshot::Sender<ColonyStats>),
16 GetSnapshot(oneshot::Sender<ColonySnapshot>),
17 RunTicks(u64, oneshot::Sender<Vec<Vec<ColonyEvent>>>),
18 Ingest {
19 title: String,
20 content: String,
21 position: Position,
22 ticks: u64,
23 response: oneshot::Sender<IngestResult>,
24 },
25 Query {
26 query: String,
27 max_results: usize,
28 alpha: f64,
29 response: oneshot::Sender<QueryResult>,
30 },
31}
32
33pub struct IngestResult {
35 pub document_id: String,
36 pub nodes_created: usize,
37 pub edges_created: usize,
38 pub tick: u64,
39}
40
41pub struct QueryResult {
43 pub results: Vec<QueryHit>,
44 pub total_nodes: usize,
45 pub total_edges: usize,
46}
47
48pub struct QueryHit {
49 pub label: String,
50 pub score: f64,
51 pub tfidf_score: f64,
52 pub graph_score: f64,
53}
54
55#[derive(Clone)]
57pub struct AppState {
58 cmd_tx: mpsc::Sender<ColonyCommand>,
60 pub event_tx: broadcast::Sender<ColonyEvent>,
62}
63
64impl AppState {
65 pub fn new(_db_path: Option<String>) -> Result<Self> {
67 let (cmd_tx, cmd_rx) = mpsc::channel();
68 let (event_tx, _) = broadcast::channel(1000);
69 let event_tx_clone = event_tx.clone();
70
71 thread::spawn(move || {
73 let mut colony = Colony::from_config(ColonyConfig::default());
74
75 while let Ok(cmd) = cmd_rx.recv() {
76 match cmd {
77 ColonyCommand::GetStats(response) => {
78 let _ = response.send(colony.stats());
79 }
80 ColonyCommand::GetSnapshot(response) => {
81 let _ = response.send(colony.snapshot());
82 }
83 ColonyCommand::RunTicks(ticks, response) => {
84 let all_events = colony.run(ticks);
85 for events in &all_events {
87 for event in events {
88 let _ = event_tx_clone.send(event.clone());
89 }
90 }
91 let _ = response.send(all_events);
92 }
93 ColonyCommand::Ingest { title, content, position, ticks, response } => {
94 use phago::prelude::Digester;
95
96 let before_nodes = colony.stats().graph_nodes;
97 let before_edges = colony.stats().graph_edges;
98
99 let doc_id = colony.ingest_document(&title, &content, position);
100 colony.spawn(Box::new(Digester::new(position).with_max_idle(30)));
101
102 let all_events = colony.run(ticks);
103 for events in &all_events {
104 for event in events {
105 let _ = event_tx_clone.send(event.clone());
106 }
107 }
108
109 let after_nodes = colony.stats().graph_nodes;
110 let after_edges = colony.stats().graph_edges;
111
112 let _ = response.send(IngestResult {
113 document_id: format!("{}", doc_id.0),
114 nodes_created: after_nodes.saturating_sub(before_nodes),
115 edges_created: after_edges.saturating_sub(before_edges),
116 tick: colony.stats().tick,
117 });
118 }
119 ColonyCommand::Query { query, max_results, alpha, response } => {
120 use phago::rag::{hybrid_query, HybridConfig};
121
122 let config = HybridConfig {
123 alpha,
124 max_results,
125 candidate_multiplier: 3,
126 };
127 let results = hybrid_query(&colony, &query, &config);
128 let stats = colony.stats();
129
130 let _ = response.send(QueryResult {
131 results: results
132 .into_iter()
133 .map(|r| QueryHit {
134 label: r.label,
135 score: r.final_score,
136 tfidf_score: r.tfidf_score,
137 graph_score: r.graph_score,
138 })
139 .collect(),
140 total_nodes: stats.graph_nodes,
141 total_edges: stats.graph_edges,
142 });
143 }
144 }
145 }
146 });
147
148 Ok(Self { cmd_tx, event_tx })
149 }
150
151 pub async fn stats(&self) -> ColonyStats {
153 let (tx, rx) = oneshot::channel();
154 let _ = self.cmd_tx.send(ColonyCommand::GetStats(tx));
155 rx.await.unwrap_or_else(|_| ColonyStats {
156 tick: 0,
157 agents_alive: 0,
158 agents_died: 0,
159 total_spawned: 0,
160 graph_nodes: 0,
161 graph_edges: 0,
162 total_signals: 0,
163 documents_total: 0,
164 documents_digested: 0,
165 })
166 }
167
168 pub async fn snapshot(&self) -> ColonySnapshot {
170 let (tx, rx) = oneshot::channel();
171 let _ = self.cmd_tx.send(ColonyCommand::GetSnapshot(tx));
172 rx.await.unwrap_or_else(|_| ColonySnapshot {
173 tick: 0,
174 agents: vec![],
175 nodes: vec![],
176 edges: vec![],
177 stats: ColonyStats {
178 tick: 0,
179 agents_alive: 0,
180 agents_died: 0,
181 total_spawned: 0,
182 graph_nodes: 0,
183 graph_edges: 0,
184 total_signals: 0,
185 documents_total: 0,
186 documents_digested: 0,
187 },
188 })
189 }
190
191 pub async fn run(&self, ticks: u64) -> Vec<Vec<ColonyEvent>> {
193 let (tx, rx) = oneshot::channel();
194 let _ = self.cmd_tx.send(ColonyCommand::RunTicks(ticks, tx));
195 rx.await.unwrap_or_default()
196 }
197
198 pub async fn ingest(&self, title: String, content: String, position: Position, ticks: u64) -> IngestResult {
200 let (tx, rx) = oneshot::channel();
201 let _ = self.cmd_tx.send(ColonyCommand::Ingest {
202 title,
203 content,
204 position,
205 ticks,
206 response: tx,
207 });
208 rx.await.unwrap_or_else(|_| IngestResult {
209 document_id: "error".to_string(),
210 nodes_created: 0,
211 edges_created: 0,
212 tick: 0,
213 })
214 }
215
216 pub async fn query(&self, query: String, max_results: usize, alpha: f64) -> QueryResult {
218 let (tx, rx) = oneshot::channel();
219 let _ = self.cmd_tx.send(ColonyCommand::Query {
220 query,
221 max_results,
222 alpha,
223 response: tx,
224 });
225 rx.await.unwrap_or_else(|_| QueryResult {
226 results: vec![],
227 total_nodes: 0,
228 total_edges: 0,
229 })
230 }
231
232 pub fn subscribe(&self) -> broadcast::Receiver<ColonyEvent> {
234 self.event_tx.subscribe()
235 }
236}