Skip to main content

envoy/
engine.rs

1use sqlitegraph::backend::native::v3::pubsub::{PubSubEvent, Publisher};
2use sqlitegraph::{GraphEdge, GraphEntity, SqliteGraph};
3
4use crate::error::{EnvoyError, Result};
5use crate::types::{AgentStatus, Channel, EngineStats, Event, EventPayload, Subscription};
6
7const KIND_CHANNEL: &str = "EnvoyChannel";
8const KIND_EVENT: &str = "EnvoyEvent";
9const KIND_SUBSCRIPTION: &str = "EnvoySubscription";
10const KIND_SEQ_COUNTER: &str = "EnvoySeqCounter";
11
12const EDGE_POSTED_IN: &str = "POSTED_IN";
13const EDGE_SUBSCRIBES_TO: &str = "SUBSCRIBES_TO";
14
15/// The envoy coordination engine — wraps sqlitegraph's graph database
16/// and pub/sub Publisher for agent-oriented coordination.
17///
18/// Uses sqlitegraph 2.2.0's indexed kind-based lookups (`find_entity_by_kind_and_name`,
19/// `find_entities_by_kind`) for O(1) entity resolution without in-memory caches.
20pub struct Engine {
21    graph: SqliteGraph,
22    publisher: Publisher,
23}
24
25impl Engine {
26    /// Open (or create) an envoy database backed by sqlitegraph.
27    pub fn open(path: &str) -> Result<Self> {
28        let graph = SqliteGraph::open(path)?;
29        let publisher = Publisher::new();
30        Ok(Self { graph, publisher })
31    }
32
33    /// Open an in-memory engine for testing.
34    pub fn open_in_memory() -> Result<Self> {
35        let graph = SqliteGraph::open_in_memory()?;
36        let publisher = Publisher::new();
37        Ok(Self { graph, publisher })
38    }
39
40    /// Access the underlying sqlitegraph Publisher for real-time event listeners.
41    pub fn publisher(&self) -> &Publisher {
42        &self.publisher
43    }
44
45    /// Access the underlying sqlitegraph for direct graph operations.
46    pub fn graph(&self) -> &SqliteGraph {
47        &self.graph
48    }
49
50    // ── Channels ──
51
52    pub fn create_channel(&self, name: &str, description: &str) -> Result<Channel> {
53        if self
54            .graph
55            .find_entity_by_kind_and_name(KIND_CHANNEL, name)?
56            .is_some()
57        {
58            return Err(EnvoyError::ChannelAlreadyExists(name.to_string()));
59        }
60
61        let now = chrono::Utc::now().to_rfc3339();
62        let entity = GraphEntity {
63            id: 0,
64            kind: KIND_CHANNEL.to_string(),
65            name: name.to_string(),
66            file_path: None,
67            data: serde_json::json!({"description": description, "created_at": &now}),
68        };
69        let id = self.graph.insert_entity(&entity)?;
70
71        self.publisher.emit(PubSubEvent::NodeChanged {
72            node_id: id,
73            snapshot_id: 0,
74        });
75
76        Ok(Channel {
77            id,
78            name: name.to_string(),
79            description: description.to_string(),
80            created_at: now,
81        })
82    }
83
84    pub fn get_channel(&self, name: &str) -> Result<Channel> {
85        let entity = self
86            .graph
87            .find_entity_by_kind_and_name(KIND_CHANNEL, name)?
88            .ok_or_else(|| EnvoyError::ChannelNotFound(name.to_string()))?;
89        let desc = entity
90            .data
91            .get("description")
92            .and_then(|v| v.as_str())
93            .unwrap_or("");
94        let created_at = entity
95            .data
96            .get("created_at")
97            .and_then(|v| v.as_str())
98            .unwrap_or("");
99        Ok(Channel {
100            id: entity.id,
101            name: entity.name.clone(),
102            description: desc.to_string(),
103            created_at: created_at.to_string(),
104        })
105    }
106
107    pub fn get_channel_by_id(&self, id: i64) -> Result<Channel> {
108        let entity = self
109            .graph
110            .get_entity(id)
111            .map_err(|_| EnvoyError::ChannelNotFound(format!("id={id}")))?;
112        if entity.kind != KIND_CHANNEL {
113            return Err(EnvoyError::ChannelNotFound(format!("id={id}")));
114        }
115        let desc = entity
116            .data
117            .get("description")
118            .and_then(|v| v.as_str())
119            .unwrap_or("");
120        let created_at = entity
121            .data
122            .get("created_at")
123            .and_then(|v| v.as_str())
124            .unwrap_or("");
125        Ok(Channel {
126            id: entity.id,
127            name: entity.name.clone(),
128            description: desc.to_string(),
129            created_at: created_at.to_string(),
130        })
131    }
132
133    pub fn list_channels(&self) -> Result<Vec<Channel>> {
134        let entities = self.graph.find_entities_by_kind(KIND_CHANNEL)?;
135        let mut channels = Vec::new();
136        for entity in entities {
137            let desc = entity
138                .data
139                .get("description")
140                .and_then(|v| v.as_str())
141                .unwrap_or("");
142            let created_at = entity
143                .data
144                .get("created_at")
145                .and_then(|v| v.as_str())
146                .unwrap_or("");
147            channels.push(Channel {
148                id: entity.id,
149                name: entity.name.clone(),
150                description: desc.to_string(),
151                created_at: created_at.to_string(),
152            });
153        }
154        Ok(channels)
155    }
156
157    // ── Publishing ──
158
159    pub fn publish(
160        &self,
161        channel_name: &str,
162        sender: &str,
163        payload: EventPayload,
164    ) -> Result<Event> {
165        let channel = self.get_channel(channel_name)?;
166        let now = chrono::Utc::now().to_rfc3339();
167        let next_seq = self.next_sequence_id(channel.id)?;
168
169        let name = format!("event-{}-{}", channel.id, next_seq);
170        let entity = GraphEntity {
171            id: 0,
172            kind: KIND_EVENT.to_string(),
173            name,
174            file_path: None,
175            data: serde_json::json!({
176                "channel_id": channel.id,
177                "channel_name": channel.name,
178                "sender": sender,
179                "payload": serde_json::to_value(&payload)?,
180                "timestamp": now,
181                "sequence_id": next_seq,
182            }),
183        };
184        let id = self.graph.insert_entity(&entity)?;
185
186        let edge = GraphEdge {
187            id: 0,
188            from_id: id,
189            to_id: channel.id,
190            edge_type: EDGE_POSTED_IN.to_string(),
191            data: serde_json::json!({}),
192        };
193        self.graph.insert_edge(&edge)?;
194
195        self.publisher.emit(PubSubEvent::NodeChanged {
196            node_id: id,
197            snapshot_id: 0,
198        });
199
200        Ok(Event {
201            id,
202            channel_id: channel.id,
203            channel_name: channel.name,
204            sender: sender.to_string(),
205            payload,
206            timestamp: now,
207            sequence_id: next_seq,
208        })
209    }
210
211    pub fn replay(
212        &self,
213        channel_name: &str,
214        since_sequence: i64,
215        limit: Option<i64>,
216    ) -> Result<Vec<Event>> {
217        let channel = self.get_channel(channel_name)?;
218        let all_events = self.get_channel_events(channel.id)?;
219        let mut events: Vec<Event> = all_events
220            .into_iter()
221            .filter(|e| e.sequence_id > since_sequence)
222            .collect();
223        events.sort_by_key(|e| e.sequence_id);
224        if let Some(limit) = limit {
225            events.truncate(limit as usize);
226        }
227        Ok(events)
228    }
229
230    pub fn catch_up(&self, agent_id: &str, channel_name: &str) -> Result<Vec<Event>> {
231        let sub = self.get_subscription(agent_id, channel_name)?;
232        let events = self.replay(channel_name, sub.last_seen_sequence, None)?;
233        if let Some(last) = events.last() {
234            self.update_last_seen(agent_id, channel_name, last.sequence_id)?;
235        }
236        Ok(events)
237    }
238
239    // ── Subscriptions ──
240
241    pub fn subscribe(&self, agent_id: &str, channel_name: &str) -> Result<Subscription> {
242        let channel = self.get_channel(channel_name)?;
243
244        // Check for existing subscription via indexed kind+name lookup
245        let sub_name = sub_entity_name(agent_id, channel.id);
246        if let Some(existing) = self
247            .graph
248            .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
249        {
250            let last_seen = existing
251                .data
252                .get("last_seen_sequence")
253                .and_then(|v| v.as_i64())
254                .unwrap_or(0);
255            let created_at = read_json_str(&existing.data, "created_at");
256            let updated_at = read_json_str(&existing.data, "updated_at");
257            return Ok(Subscription {
258                agent_id: agent_id.to_string(),
259                channel_id: channel.id,
260                channel_name: channel.name,
261                last_seen_sequence: last_seen,
262                created_at,
263                updated_at,
264            });
265        }
266
267        let current_max = self.max_sequence_id(channel.id)?;
268        let now = chrono::Utc::now().to_rfc3339();
269        let entity = GraphEntity {
270            id: 0,
271            kind: KIND_SUBSCRIPTION.to_string(),
272            name: sub_name,
273            file_path: None,
274            data: serde_json::json!({
275                "agent_id": agent_id,
276                "channel_id": channel.id,
277                "channel_name": channel.name,
278                "last_seen_sequence": current_max,
279                "created_at": &now,
280                "updated_at": &now,
281            }),
282        };
283        let id = self.graph.insert_entity(&entity)?;
284
285        let edge = GraphEdge {
286            id: 0,
287            from_id: id,
288            to_id: channel.id,
289            edge_type: EDGE_SUBSCRIBES_TO.to_string(),
290            data: serde_json::json!({}),
291        };
292        let edge_id = self.graph.insert_edge(&edge)?;
293
294        // Store edge_id in entity data for O(1) deletion on unsubscribe
295        let mut sub_entity = self
296            .graph
297            .get_entity(id)
298            .map_err(|_| EnvoyError::InvalidEntity("subscription not found after insert".into()))?;
299        sub_entity.data["sub_edge_id"] = serde_json::json!(edge_id);
300        self.graph.update_entity(&sub_entity)?;
301
302        self.publisher.emit(PubSubEvent::NodeChanged {
303            node_id: id,
304            snapshot_id: 0,
305        });
306
307        Ok(Subscription {
308            agent_id: agent_id.to_string(),
309            channel_id: channel.id,
310            channel_name: channel.name,
311            last_seen_sequence: current_max,
312            created_at: now.clone(),
313            updated_at: now,
314        })
315    }
316
317    pub fn unsubscribe(&self, agent_id: &str, channel_name: &str) -> Result<()> {
318        let channel = self.get_channel(channel_name)?;
319        let sub_name = sub_entity_name(agent_id, channel.id);
320        let sub_entity = self
321            .graph
322            .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
323            .ok_or_else(|| EnvoyError::NotSubscribed {
324                agent: agent_id.to_string(),
325                channel: channel.name.clone(),
326            })?;
327
328        // O(1): delete edge by stored ID instead of scanning all SUBSCRIBES_TO edges
329        if let Some(edge_id) = sub_entity.data.get("sub_edge_id").and_then(|v| v.as_i64()) {
330            if let Err(e) = self.graph.delete_edge(edge_id) {
331                eprintln!(
332                    "warn: failed to delete subscription edge {}: {}",
333                    edge_id, e
334                );
335            }
336        }
337
338        self.graph.delete_entity(sub_entity.id)?;
339        Ok(())
340    }
341
342    pub fn get_subscription(&self, agent_id: &str, channel_name: &str) -> Result<Subscription> {
343        let channel = self.get_channel(channel_name)?;
344        let sub_name = sub_entity_name(agent_id, channel.id);
345        let entity = self
346            .graph
347            .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
348            .ok_or_else(|| EnvoyError::NotSubscribed {
349                agent: agent_id.to_string(),
350                channel: channel.name.clone(),
351            })?;
352        let last_seen = entity
353            .data
354            .get("last_seen_sequence")
355            .and_then(|v| v.as_i64())
356            .unwrap_or(0);
357        let created_at = read_json_str(&entity.data, "created_at");
358        let updated_at = read_json_str(&entity.data, "updated_at");
359        Ok(Subscription {
360            agent_id: agent_id.to_string(),
361            channel_id: channel.id,
362            channel_name: channel.name,
363            last_seen_sequence: last_seen,
364            created_at,
365            updated_at,
366        })
367    }
368
369    pub fn list_subscriptions(&self, agent_id: &str) -> Result<Vec<Subscription>> {
370        let entities = self.graph.find_entities_by_kind(KIND_SUBSCRIPTION)?;
371        let mut subs = Vec::new();
372        for entity in entities {
373            let data_agent = entity
374                .data
375                .get("agent_id")
376                .and_then(|v| v.as_str())
377                .unwrap_or("");
378            if data_agent == agent_id {
379                let channel_id = entity
380                    .data
381                    .get("channel_id")
382                    .and_then(|v| v.as_i64())
383                    .unwrap_or(0);
384                let last_seen = entity
385                    .data
386                    .get("last_seen_sequence")
387                    .and_then(|v| v.as_i64())
388                    .unwrap_or(0);
389                let created_at = read_json_str(&entity.data, "created_at");
390                let updated_at = read_json_str(&entity.data, "updated_at");
391                if let Ok(channel) = self.get_channel_by_id(channel_id) {
392                    subs.push(Subscription {
393                        agent_id: agent_id.to_string(),
394                        channel_id,
395                        channel_name: channel.name,
396                        last_seen_sequence: last_seen,
397                        created_at,
398                        updated_at,
399                    });
400                }
401            }
402        }
403        Ok(subs)
404    }
405
406    // ── Status ──
407
408    pub fn status(&self) -> Result<EngineStats> {
409        let channels = self.graph.find_entities_by_kind(KIND_CHANNEL)?.len() as i64;
410        let events = self.graph.find_entities_by_kind(KIND_EVENT)?.len() as i64;
411        let subscriptions = self.graph.find_entities_by_kind(KIND_SUBSCRIPTION)?.len() as i64;
412        Ok(EngineStats {
413            channels,
414            events,
415            subscriptions,
416        })
417    }
418
419    // ── Internal helpers ──
420
421    fn get_channel_events(&self, channel_id: i64) -> Result<Vec<Event>> {
422        let entities = self.graph.find_entities_by_kind(KIND_EVENT)?;
423        let mut events = Vec::new();
424        for entity in entities {
425            let evt_channel_id = entity
426                .data
427                .get("channel_id")
428                .and_then(|v| v.as_i64())
429                .unwrap_or(0);
430            if evt_channel_id == channel_id {
431                events.push(event_from_entity(&entity)?);
432            }
433        }
434        events.sort_by_key(|e| e.sequence_id);
435        Ok(events)
436    }
437
438    fn next_sequence_id(&self, channel_id: i64) -> Result<i64> {
439        let counter_name = seq_counter_name(channel_id);
440        if let Some(mut entity) = self
441            .graph
442            .find_entity_by_kind_and_name(KIND_SEQ_COUNTER, &counter_name)?
443        {
444            let next = entity
445                .data
446                .get("next")
447                .and_then(|v| v.as_i64())
448                .unwrap_or(1);
449            entity.data["next"] = serde_json::json!(next + 1);
450            self.graph.update_entity(&entity)?;
451            Ok(next)
452        } else {
453            let entity = GraphEntity {
454                id: 0,
455                kind: KIND_SEQ_COUNTER.to_string(),
456                name: counter_name,
457                file_path: None,
458                data: serde_json::json!({"next": 2}),
459            };
460            self.graph.insert_entity(&entity)?;
461            Ok(1)
462        }
463    }
464
465    fn max_sequence_id(&self, channel_id: i64) -> Result<i64> {
466        let counter_name = seq_counter_name(channel_id);
467        if let Some(entity) = self
468            .graph
469            .find_entity_by_kind_and_name(KIND_SEQ_COUNTER, &counter_name)?
470        {
471            let next = entity
472                .data
473                .get("next")
474                .and_then(|v| v.as_i64())
475                .unwrap_or(1);
476            Ok(next - 1)
477        } else {
478            Ok(0)
479        }
480    }
481
482    fn update_last_seen(&self, agent_id: &str, channel_name: &str, seq: i64) -> Result<()> {
483        let channel = self.get_channel(channel_name)?;
484        let sub_name = sub_entity_name(agent_id, channel.id);
485        let mut entity = self
486            .graph
487            .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
488            .ok_or_else(|| EnvoyError::NotSubscribed {
489                agent: agent_id.to_string(),
490                channel: channel.name.clone(),
491            })?;
492        entity.data["last_seen_sequence"] = serde_json::json!(seq);
493        entity.data["updated_at"] = serde_json::json!(chrono::Utc::now().to_rfc3339());
494        self.graph.update_entity(&entity)?;
495        Ok(())
496    }
497}
498
499fn sub_entity_name(agent_id: &str, channel_id: i64) -> String {
500    format!("sub-{}-{}", agent_id, channel_id)
501}
502
503fn seq_counter_name(channel_id: i64) -> String {
504    format!("seq-{channel_id}")
505}
506
507fn read_json_str(data: &serde_json::Value, key: &str) -> String {
508    data.get(key)
509        .and_then(|v| v.as_str())
510        .unwrap_or("")
511        .to_string()
512}
513
514fn event_from_entity(entity: &GraphEntity) -> Result<Event> {
515    let channel_id = entity
516        .data
517        .get("channel_id")
518        .and_then(|v| v.as_i64())
519        .unwrap_or(0);
520    let channel_name = entity
521        .data
522        .get("channel_name")
523        .and_then(|v| v.as_str())
524        .unwrap_or("unknown");
525    let sender = entity
526        .data
527        .get("sender")
528        .and_then(|v| v.as_str())
529        .unwrap_or("unknown");
530    let timestamp = entity
531        .data
532        .get("timestamp")
533        .and_then(|v| v.as_str())
534        .unwrap_or("");
535    let sequence_id = entity
536        .data
537        .get("sequence_id")
538        .and_then(|v| v.as_i64())
539        .unwrap_or(0);
540
541    let payload = entity
542        .data
543        .get("payload")
544        .and_then(|v| serde_json::from_value(v.clone()).ok())
545        .unwrap_or_else(|| EventPayload {
546            status: AgentStatus::Working,
547            working_on: "unknown".into(),
548            waiting_for: None,
549            can_start: None,
550            verified: false,
551            magellan_trace: None,
552            extra: serde_json::Value::Null,
553        });
554
555    Ok(Event {
556        id: entity.id,
557        channel_id,
558        channel_name: channel_name.to_string(),
559        sender: sender.to_string(),
560        payload,
561        timestamp: timestamp.to_string(),
562        sequence_id,
563    })
564}