1use sqlitegraph::backend::native::v3::pubsub::{PubSubEvent, Publisher};
2use sqlitegraph::{GraphEdge, GraphEntity, SqliteGraph};
3
4use crate::error::{EnvoyError, Result};
5use crate::types::{AgentStatus, Channel, EngineStats, Event, EventPayload, Subscription};
6
7const KIND_CHANNEL: &str = "EnvoyChannel";
8const KIND_EVENT: &str = "EnvoyEvent";
9const KIND_SUBSCRIPTION: &str = "EnvoySubscription";
10const KIND_SEQ_COUNTER: &str = "EnvoySeqCounter";
11
12const EDGE_POSTED_IN: &str = "POSTED_IN";
13const EDGE_SUBSCRIBES_TO: &str = "SUBSCRIBES_TO";
14
15pub struct Engine {
21 graph: SqliteGraph,
22 publisher: Publisher,
23}
24
25impl Engine {
26 pub fn open(path: &str) -> Result<Self> {
28 let graph = SqliteGraph::open(path)?;
29 let publisher = Publisher::new();
30 Ok(Self { graph, publisher })
31 }
32
33 pub fn open_in_memory() -> Result<Self> {
35 let graph = SqliteGraph::open_in_memory()?;
36 let publisher = Publisher::new();
37 Ok(Self { graph, publisher })
38 }
39
40 pub fn publisher(&self) -> &Publisher {
42 &self.publisher
43 }
44
45 pub fn graph(&self) -> &SqliteGraph {
47 &self.graph
48 }
49
50 pub fn create_channel(&self, name: &str, description: &str) -> Result<Channel> {
53 if self
54 .graph
55 .find_entity_by_kind_and_name(KIND_CHANNEL, name)?
56 .is_some()
57 {
58 return Err(EnvoyError::ChannelAlreadyExists(name.to_string()));
59 }
60
61 let now = chrono::Utc::now().to_rfc3339();
62 let entity = GraphEntity {
63 id: 0,
64 kind: KIND_CHANNEL.to_string(),
65 name: name.to_string(),
66 file_path: None,
67 data: serde_json::json!({"description": description, "created_at": &now}),
68 };
69 let id = self.graph.insert_entity(&entity)?;
70
71 self.publisher.emit(PubSubEvent::NodeChanged {
72 node_id: id,
73 snapshot_id: 0,
74 });
75
76 Ok(Channel {
77 id,
78 name: name.to_string(),
79 description: description.to_string(),
80 created_at: now,
81 })
82 }
83
84 pub fn get_channel(&self, name: &str) -> Result<Channel> {
85 let entity = self
86 .graph
87 .find_entity_by_kind_and_name(KIND_CHANNEL, name)?
88 .ok_or_else(|| EnvoyError::ChannelNotFound(name.to_string()))?;
89 let desc = entity
90 .data
91 .get("description")
92 .and_then(|v| v.as_str())
93 .unwrap_or("");
94 let created_at = entity
95 .data
96 .get("created_at")
97 .and_then(|v| v.as_str())
98 .unwrap_or("");
99 Ok(Channel {
100 id: entity.id,
101 name: entity.name.clone(),
102 description: desc.to_string(),
103 created_at: created_at.to_string(),
104 })
105 }
106
107 pub fn get_channel_by_id(&self, id: i64) -> Result<Channel> {
108 let entity = self
109 .graph
110 .get_entity(id)
111 .map_err(|_| EnvoyError::ChannelNotFound(format!("id={id}")))?;
112 if entity.kind != KIND_CHANNEL {
113 return Err(EnvoyError::ChannelNotFound(format!("id={id}")));
114 }
115 let desc = entity
116 .data
117 .get("description")
118 .and_then(|v| v.as_str())
119 .unwrap_or("");
120 let created_at = entity
121 .data
122 .get("created_at")
123 .and_then(|v| v.as_str())
124 .unwrap_or("");
125 Ok(Channel {
126 id: entity.id,
127 name: entity.name.clone(),
128 description: desc.to_string(),
129 created_at: created_at.to_string(),
130 })
131 }
132
133 pub fn list_channels(&self) -> Result<Vec<Channel>> {
134 let entities = self.graph.find_entities_by_kind(KIND_CHANNEL)?;
135 let mut channels = Vec::new();
136 for entity in entities {
137 let desc = entity
138 .data
139 .get("description")
140 .and_then(|v| v.as_str())
141 .unwrap_or("");
142 let created_at = entity
143 .data
144 .get("created_at")
145 .and_then(|v| v.as_str())
146 .unwrap_or("");
147 channels.push(Channel {
148 id: entity.id,
149 name: entity.name.clone(),
150 description: desc.to_string(),
151 created_at: created_at.to_string(),
152 });
153 }
154 Ok(channels)
155 }
156
157 pub fn publish(
160 &self,
161 channel_name: &str,
162 sender: &str,
163 payload: EventPayload,
164 ) -> Result<Event> {
165 let channel = self.get_channel(channel_name)?;
166 let now = chrono::Utc::now().to_rfc3339();
167 let next_seq = self.next_sequence_id(channel.id)?;
168
169 let name = format!("event-{}-{}", channel.id, next_seq);
170 let entity = GraphEntity {
171 id: 0,
172 kind: KIND_EVENT.to_string(),
173 name,
174 file_path: None,
175 data: serde_json::json!({
176 "channel_id": channel.id,
177 "channel_name": channel.name,
178 "sender": sender,
179 "payload": serde_json::to_value(&payload)?,
180 "timestamp": now,
181 "sequence_id": next_seq,
182 }),
183 };
184 let id = self.graph.insert_entity(&entity)?;
185
186 let edge = GraphEdge {
187 id: 0,
188 from_id: id,
189 to_id: channel.id,
190 edge_type: EDGE_POSTED_IN.to_string(),
191 data: serde_json::json!({}),
192 };
193 self.graph.insert_edge(&edge)?;
194
195 self.publisher.emit(PubSubEvent::NodeChanged {
196 node_id: id,
197 snapshot_id: 0,
198 });
199
200 Ok(Event {
201 id,
202 channel_id: channel.id,
203 channel_name: channel.name,
204 sender: sender.to_string(),
205 payload,
206 timestamp: now,
207 sequence_id: next_seq,
208 })
209 }
210
211 pub fn replay(
212 &self,
213 channel_name: &str,
214 since_sequence: i64,
215 limit: Option<i64>,
216 ) -> Result<Vec<Event>> {
217 let channel = self.get_channel(channel_name)?;
218 let all_events = self.get_channel_events(channel.id)?;
219 let mut events: Vec<Event> = all_events
220 .into_iter()
221 .filter(|e| e.sequence_id > since_sequence)
222 .collect();
223 events.sort_by_key(|e| e.sequence_id);
224 if let Some(limit) = limit {
225 events.truncate(limit as usize);
226 }
227 Ok(events)
228 }
229
230 pub fn catch_up(&self, agent_id: &str, channel_name: &str) -> Result<Vec<Event>> {
231 let sub = self.get_subscription(agent_id, channel_name)?;
232 let events = self.replay(channel_name, sub.last_seen_sequence, None)?;
233 if let Some(last) = events.last() {
234 self.update_last_seen(agent_id, channel_name, last.sequence_id)?;
235 }
236 Ok(events)
237 }
238
239 pub fn subscribe(&self, agent_id: &str, channel_name: &str) -> Result<Subscription> {
242 let channel = self.get_channel(channel_name)?;
243
244 let sub_name = sub_entity_name(agent_id, channel.id);
246 if let Some(existing) = self
247 .graph
248 .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
249 {
250 let last_seen = existing
251 .data
252 .get("last_seen_sequence")
253 .and_then(|v| v.as_i64())
254 .unwrap_or(0);
255 let created_at = read_json_str(&existing.data, "created_at");
256 let updated_at = read_json_str(&existing.data, "updated_at");
257 return Ok(Subscription {
258 agent_id: agent_id.to_string(),
259 channel_id: channel.id,
260 channel_name: channel.name,
261 last_seen_sequence: last_seen,
262 created_at,
263 updated_at,
264 });
265 }
266
267 let current_max = self.max_sequence_id(channel.id)?;
268 let now = chrono::Utc::now().to_rfc3339();
269 let entity = GraphEntity {
270 id: 0,
271 kind: KIND_SUBSCRIPTION.to_string(),
272 name: sub_name,
273 file_path: None,
274 data: serde_json::json!({
275 "agent_id": agent_id,
276 "channel_id": channel.id,
277 "channel_name": channel.name,
278 "last_seen_sequence": current_max,
279 "created_at": &now,
280 "updated_at": &now,
281 }),
282 };
283 let id = self.graph.insert_entity(&entity)?;
284
285 let edge = GraphEdge {
286 id: 0,
287 from_id: id,
288 to_id: channel.id,
289 edge_type: EDGE_SUBSCRIBES_TO.to_string(),
290 data: serde_json::json!({}),
291 };
292 let edge_id = self.graph.insert_edge(&edge)?;
293
294 let mut sub_entity = self
296 .graph
297 .get_entity(id)
298 .map_err(|_| EnvoyError::InvalidEntity("subscription not found after insert".into()))?;
299 sub_entity.data["sub_edge_id"] = serde_json::json!(edge_id);
300 self.graph.update_entity(&sub_entity)?;
301
302 self.publisher.emit(PubSubEvent::NodeChanged {
303 node_id: id,
304 snapshot_id: 0,
305 });
306
307 Ok(Subscription {
308 agent_id: agent_id.to_string(),
309 channel_id: channel.id,
310 channel_name: channel.name,
311 last_seen_sequence: current_max,
312 created_at: now.clone(),
313 updated_at: now,
314 })
315 }
316
317 pub fn unsubscribe(&self, agent_id: &str, channel_name: &str) -> Result<()> {
318 let channel = self.get_channel(channel_name)?;
319 let sub_name = sub_entity_name(agent_id, channel.id);
320 let sub_entity = self
321 .graph
322 .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
323 .ok_or_else(|| EnvoyError::NotSubscribed {
324 agent: agent_id.to_string(),
325 channel: channel.name.clone(),
326 })?;
327
328 if let Some(edge_id) = sub_entity.data.get("sub_edge_id").and_then(|v| v.as_i64()) {
330 if let Err(e) = self.graph.delete_edge(edge_id) {
331 eprintln!(
332 "warn: failed to delete subscription edge {}: {}",
333 edge_id, e
334 );
335 }
336 }
337
338 self.graph.delete_entity(sub_entity.id)?;
339 Ok(())
340 }
341
342 pub fn get_subscription(&self, agent_id: &str, channel_name: &str) -> Result<Subscription> {
343 let channel = self.get_channel(channel_name)?;
344 let sub_name = sub_entity_name(agent_id, channel.id);
345 let entity = self
346 .graph
347 .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
348 .ok_or_else(|| EnvoyError::NotSubscribed {
349 agent: agent_id.to_string(),
350 channel: channel.name.clone(),
351 })?;
352 let last_seen = entity
353 .data
354 .get("last_seen_sequence")
355 .and_then(|v| v.as_i64())
356 .unwrap_or(0);
357 let created_at = read_json_str(&entity.data, "created_at");
358 let updated_at = read_json_str(&entity.data, "updated_at");
359 Ok(Subscription {
360 agent_id: agent_id.to_string(),
361 channel_id: channel.id,
362 channel_name: channel.name,
363 last_seen_sequence: last_seen,
364 created_at,
365 updated_at,
366 })
367 }
368
369 pub fn list_subscriptions(&self, agent_id: &str) -> Result<Vec<Subscription>> {
370 let entities = self.graph.find_entities_by_kind(KIND_SUBSCRIPTION)?;
371 let mut subs = Vec::new();
372 for entity in entities {
373 let data_agent = entity
374 .data
375 .get("agent_id")
376 .and_then(|v| v.as_str())
377 .unwrap_or("");
378 if data_agent == agent_id {
379 let channel_id = entity
380 .data
381 .get("channel_id")
382 .and_then(|v| v.as_i64())
383 .unwrap_or(0);
384 let last_seen = entity
385 .data
386 .get("last_seen_sequence")
387 .and_then(|v| v.as_i64())
388 .unwrap_or(0);
389 let created_at = read_json_str(&entity.data, "created_at");
390 let updated_at = read_json_str(&entity.data, "updated_at");
391 if let Ok(channel) = self.get_channel_by_id(channel_id) {
392 subs.push(Subscription {
393 agent_id: agent_id.to_string(),
394 channel_id,
395 channel_name: channel.name,
396 last_seen_sequence: last_seen,
397 created_at,
398 updated_at,
399 });
400 }
401 }
402 }
403 Ok(subs)
404 }
405
406 pub fn status(&self) -> Result<EngineStats> {
409 let channels = self.graph.find_entities_by_kind(KIND_CHANNEL)?.len() as i64;
410 let events = self.graph.find_entities_by_kind(KIND_EVENT)?.len() as i64;
411 let subscriptions = self.graph.find_entities_by_kind(KIND_SUBSCRIPTION)?.len() as i64;
412 Ok(EngineStats {
413 channels,
414 events,
415 subscriptions,
416 })
417 }
418
419 fn get_channel_events(&self, channel_id: i64) -> Result<Vec<Event>> {
422 let entities = self.graph.find_entities_by_kind(KIND_EVENT)?;
423 let mut events = Vec::new();
424 for entity in entities {
425 let evt_channel_id = entity
426 .data
427 .get("channel_id")
428 .and_then(|v| v.as_i64())
429 .unwrap_or(0);
430 if evt_channel_id == channel_id {
431 events.push(event_from_entity(&entity)?);
432 }
433 }
434 events.sort_by_key(|e| e.sequence_id);
435 Ok(events)
436 }
437
438 fn next_sequence_id(&self, channel_id: i64) -> Result<i64> {
439 let counter_name = seq_counter_name(channel_id);
440 if let Some(mut entity) = self
441 .graph
442 .find_entity_by_kind_and_name(KIND_SEQ_COUNTER, &counter_name)?
443 {
444 let next = entity
445 .data
446 .get("next")
447 .and_then(|v| v.as_i64())
448 .unwrap_or(1);
449 entity.data["next"] = serde_json::json!(next + 1);
450 self.graph.update_entity(&entity)?;
451 Ok(next)
452 } else {
453 let entity = GraphEntity {
454 id: 0,
455 kind: KIND_SEQ_COUNTER.to_string(),
456 name: counter_name,
457 file_path: None,
458 data: serde_json::json!({"next": 2}),
459 };
460 self.graph.insert_entity(&entity)?;
461 Ok(1)
462 }
463 }
464
465 fn max_sequence_id(&self, channel_id: i64) -> Result<i64> {
466 let counter_name = seq_counter_name(channel_id);
467 if let Some(entity) = self
468 .graph
469 .find_entity_by_kind_and_name(KIND_SEQ_COUNTER, &counter_name)?
470 {
471 let next = entity
472 .data
473 .get("next")
474 .and_then(|v| v.as_i64())
475 .unwrap_or(1);
476 Ok(next - 1)
477 } else {
478 Ok(0)
479 }
480 }
481
482 fn update_last_seen(&self, agent_id: &str, channel_name: &str, seq: i64) -> Result<()> {
483 let channel = self.get_channel(channel_name)?;
484 let sub_name = sub_entity_name(agent_id, channel.id);
485 let mut entity = self
486 .graph
487 .find_entity_by_kind_and_name(KIND_SUBSCRIPTION, &sub_name)?
488 .ok_or_else(|| EnvoyError::NotSubscribed {
489 agent: agent_id.to_string(),
490 channel: channel.name.clone(),
491 })?;
492 entity.data["last_seen_sequence"] = serde_json::json!(seq);
493 entity.data["updated_at"] = serde_json::json!(chrono::Utc::now().to_rfc3339());
494 self.graph.update_entity(&entity)?;
495 Ok(())
496 }
497}
498
499fn sub_entity_name(agent_id: &str, channel_id: i64) -> String {
500 format!("sub-{}-{}", agent_id, channel_id)
501}
502
503fn seq_counter_name(channel_id: i64) -> String {
504 format!("seq-{channel_id}")
505}
506
507fn read_json_str(data: &serde_json::Value, key: &str) -> String {
508 data.get(key)
509 .and_then(|v| v.as_str())
510 .unwrap_or("")
511 .to_string()
512}
513
514fn event_from_entity(entity: &GraphEntity) -> Result<Event> {
515 let channel_id = entity
516 .data
517 .get("channel_id")
518 .and_then(|v| v.as_i64())
519 .unwrap_or(0);
520 let channel_name = entity
521 .data
522 .get("channel_name")
523 .and_then(|v| v.as_str())
524 .unwrap_or("unknown");
525 let sender = entity
526 .data
527 .get("sender")
528 .and_then(|v| v.as_str())
529 .unwrap_or("unknown");
530 let timestamp = entity
531 .data
532 .get("timestamp")
533 .and_then(|v| v.as_str())
534 .unwrap_or("");
535 let sequence_id = entity
536 .data
537 .get("sequence_id")
538 .and_then(|v| v.as_i64())
539 .unwrap_or(0);
540
541 let payload = entity
542 .data
543 .get("payload")
544 .and_then(|v| serde_json::from_value(v.clone()).ok())
545 .unwrap_or_else(|| EventPayload {
546 status: AgentStatus::Working,
547 working_on: "unknown".into(),
548 waiting_for: None,
549 can_start: None,
550 verified: false,
551 magellan_trace: None,
552 extra: serde_json::Value::Null,
553 });
554
555 Ok(Event {
556 id: entity.id,
557 channel_id,
558 channel_name: channel_name.to_string(),
559 sender: sender.to_string(),
560 payload,
561 timestamp: timestamp.to_string(),
562 sequence_id,
563 })
564}