1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::websocket::{WebSocketConfig, WebSocketMessage, WebSocketServer};
14use crate::function::{FunctionEntry, FunctionRegistry};
15use crate::gateway::websocket::{JobData, WorkflowData, WorkflowStepData};
16
17#[derive(Debug, Clone, Default)]
19pub struct ReactorConfig {
20 pub listener: ListenerConfig,
21 pub invalidation: InvalidationConfig,
22 pub websocket: WebSocketConfig,
23}
24
25#[derive(Debug, Clone)]
27pub struct ActiveSubscription {
28 #[allow(dead_code)]
29 pub subscription_id: SubscriptionId,
30 pub session_id: SessionId,
31 #[allow(dead_code)]
32 pub client_sub_id: String,
33 pub query_name: String,
34 pub args: serde_json::Value,
35 pub last_result_hash: Option<String>,
36 #[allow(dead_code)]
37 pub read_set: ReadSet,
38}
39
40#[derive(Debug, Clone)]
42pub struct JobSubscription {
43 #[allow(dead_code)]
44 pub subscription_id: SubscriptionId,
45 pub session_id: SessionId,
46 pub client_sub_id: String,
47 #[allow(dead_code)]
48 pub job_id: Uuid, }
50
51#[derive(Debug, Clone)]
53pub struct WorkflowSubscription {
54 #[allow(dead_code)]
55 pub subscription_id: SubscriptionId,
56 pub session_id: SessionId,
57 pub client_sub_id: String,
58 #[allow(dead_code)]
59 pub workflow_id: Uuid, }
61
62pub struct Reactor {
65 #[allow(dead_code)]
66 node_id: NodeId,
67 db_pool: sqlx::PgPool,
68 registry: FunctionRegistry,
69 subscription_manager: Arc<SubscriptionManager>,
70 ws_server: Arc<WebSocketServer>,
71 change_listener: Arc<ChangeListener>,
72 invalidation_engine: Arc<InvalidationEngine>,
73 active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
75 job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
77 workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
79 shutdown_tx: broadcast::Sender<()>,
81}
82
83impl Reactor {
84 pub fn new(
86 node_id: NodeId,
87 db_pool: sqlx::PgPool,
88 registry: FunctionRegistry,
89 config: ReactorConfig,
90 ) -> Self {
91 let subscription_manager = Arc::new(SubscriptionManager::new(
92 config.websocket.max_subscriptions_per_connection,
93 ));
94 let ws_server = Arc::new(WebSocketServer::new(node_id, config.websocket));
95 let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
96 let invalidation_engine = Arc::new(InvalidationEngine::new(
97 subscription_manager.clone(),
98 config.invalidation,
99 ));
100 let (shutdown_tx, _) = broadcast::channel(1);
101
102 Self {
103 node_id,
104 db_pool,
105 registry,
106 subscription_manager,
107 ws_server,
108 change_listener,
109 invalidation_engine,
110 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
111 job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
112 workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
113 shutdown_tx,
114 }
115 }
116
117 pub fn node_id(&self) -> NodeId {
119 self.node_id
120 }
121
122 pub fn ws_server(&self) -> Arc<WebSocketServer> {
124 self.ws_server.clone()
125 }
126
127 pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
129 self.subscription_manager.clone()
130 }
131
132 pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
134 self.shutdown_tx.subscribe()
135 }
136
137 pub async fn register_session(
139 &self,
140 session_id: SessionId,
141 sender: mpsc::Sender<WebSocketMessage>,
142 ) {
143 self.ws_server.register_connection(session_id, sender).await;
144 tracing::debug!(?session_id, "Session registered with reactor");
145 }
146
147 pub async fn remove_session(&self, session_id: SessionId) {
149 if let Some(subscription_ids) = self.ws_server.remove_connection(session_id).await {
150 for sub_id in subscription_ids {
152 self.subscription_manager.remove_subscription(sub_id).await;
153 self.active_subscriptions.write().await.remove(&sub_id);
154 }
155 }
156
157 {
159 let mut job_subs = self.job_subscriptions.write().await;
160 for subscribers in job_subs.values_mut() {
161 subscribers.retain(|s| s.session_id != session_id);
162 }
163 job_subs.retain(|_, v| !v.is_empty());
165 }
166
167 {
169 let mut workflow_subs = self.workflow_subscriptions.write().await;
170 for subscribers in workflow_subs.values_mut() {
171 subscribers.retain(|s| s.session_id != session_id);
172 }
173 workflow_subs.retain(|_, v| !v.is_empty());
175 }
176
177 tracing::debug!(?session_id, "Session removed from reactor");
178 }
179
180 pub async fn subscribe(
182 &self,
183 session_id: SessionId,
184 client_sub_id: String,
185 query_name: String,
186 args: serde_json::Value,
187 ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
188 let sub_info = self
190 .subscription_manager
191 .create_subscription(session_id, &query_name, args.clone())
192 .await?;
193
194 let subscription_id = sub_info.id;
195
196 self.ws_server
198 .add_subscription(session_id, subscription_id)
199 .await?;
200
201 let (data, read_set) = self.execute_query(&query_name, &args).await?;
203
204 let result_hash = Self::compute_hash(&data);
206
207 let tables: Vec<_> = read_set.tables.iter().collect();
209 tracing::debug!(
210 ?subscription_id,
211 query_name = %query_name,
212 read_set_tables = ?tables,
213 "Updating subscription with read set"
214 );
215
216 self.subscription_manager
217 .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
218 .await;
219
220 let active = ActiveSubscription {
222 subscription_id,
223 session_id,
224 client_sub_id,
225 query_name,
226 args,
227 last_result_hash: Some(result_hash),
228 read_set,
229 };
230 self.active_subscriptions
231 .write()
232 .await
233 .insert(subscription_id, active);
234
235 tracing::debug!(?subscription_id, "Subscription created");
236
237 Ok((subscription_id, data))
238 }
239
240 pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
242 self.ws_server.remove_subscription(subscription_id).await;
243 self.subscription_manager
244 .remove_subscription(subscription_id)
245 .await;
246 self.active_subscriptions
247 .write()
248 .await
249 .remove(&subscription_id);
250 tracing::debug!(?subscription_id, "Subscription removed");
251 }
252
253 pub async fn subscribe_job(
255 &self,
256 session_id: SessionId,
257 client_sub_id: String,
258 job_id: Uuid, ) -> forge_core::Result<JobData> {
260 let subscription_id = SubscriptionId::new();
261
262 let job_data = self.fetch_job_data(job_id).await?;
264
265 let subscription = JobSubscription {
267 subscription_id,
268 session_id,
269 client_sub_id: client_sub_id.clone(),
270 job_id,
271 };
272
273 let mut subs = self.job_subscriptions.write().await;
274 subs.entry(job_id).or_default().push(subscription);
275
276 tracing::debug!(
277 ?subscription_id,
278 client_id = %client_sub_id,
279 %job_id,
280 "Job subscription created"
281 );
282
283 Ok(job_data)
284 }
285
286 pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
288 let mut subs = self.job_subscriptions.write().await;
289
290 for subscribers in subs.values_mut() {
292 subscribers
293 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
294 }
295
296 subs.retain(|_, v| !v.is_empty());
298
299 tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
300 }
301
302 pub async fn subscribe_workflow(
304 &self,
305 session_id: SessionId,
306 client_sub_id: String,
307 workflow_id: Uuid, ) -> forge_core::Result<WorkflowData> {
309 let subscription_id = SubscriptionId::new();
310
311 let workflow_data = self.fetch_workflow_data(workflow_id).await?;
313
314 let subscription = WorkflowSubscription {
316 subscription_id,
317 session_id,
318 client_sub_id: client_sub_id.clone(),
319 workflow_id,
320 };
321
322 let mut subs = self.workflow_subscriptions.write().await;
323 subs.entry(workflow_id).or_default().push(subscription);
324
325 tracing::debug!(
326 ?subscription_id,
327 client_id = %client_sub_id,
328 %workflow_id,
329 "Workflow subscription created"
330 );
331
332 Ok(workflow_data)
333 }
334
335 pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
337 let mut subs = self.workflow_subscriptions.write().await;
338
339 for subscribers in subs.values_mut() {
341 subscribers
342 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
343 }
344
345 subs.retain(|_, v| !v.is_empty());
347
348 tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
349 }
350
351 #[allow(clippy::type_complexity)]
353 async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
354 let row: Option<(
355 String,
356 Option<i32>,
357 Option<String>,
358 Option<serde_json::Value>,
359 Option<String>,
360 )> = sqlx::query_as(
361 r#"
362 SELECT status, progress_percent, progress_message, output, last_error
363 FROM forge_jobs WHERE id = $1
364 "#,
365 )
366 .bind(job_id)
367 .fetch_optional(&self.db_pool)
368 .await
369 .map_err(forge_core::ForgeError::Sql)?;
370
371 match row {
372 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
373 job_id: job_id.to_string(),
374 status,
375 progress_percent,
376 progress_message,
377 output,
378 error,
379 }),
380 None => Err(forge_core::ForgeError::NotFound(format!(
381 "Job {} not found",
382 job_id
383 ))),
384 }
385 }
386
387 #[allow(clippy::type_complexity)]
389 async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
390 let row: Option<(
392 String,
393 Option<String>,
394 Option<serde_json::Value>,
395 Option<String>,
396 )> = sqlx::query_as(
397 r#"
398 SELECT status, current_step, output, error
399 FROM forge_workflow_runs WHERE id = $1
400 "#,
401 )
402 .bind(workflow_id)
403 .fetch_optional(&self.db_pool)
404 .await
405 .map_err(forge_core::ForgeError::Sql)?;
406
407 let (status, current_step, output, error) = match row {
408 Some(r) => r,
409 None => {
410 return Err(forge_core::ForgeError::NotFound(format!(
411 "Workflow {} not found",
412 workflow_id
413 )));
414 }
415 };
416
417 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
419 r#"
420 SELECT step_name, status, error
421 FROM forge_workflow_steps
422 WHERE workflow_run_id = $1
423 ORDER BY started_at ASC NULLS LAST
424 "#,
425 )
426 .bind(workflow_id)
427 .fetch_all(&self.db_pool)
428 .await
429 .map_err(forge_core::ForgeError::Sql)?;
430
431 let steps = step_rows
432 .into_iter()
433 .map(|(name, status, error)| WorkflowStepData {
434 name,
435 status,
436 error,
437 })
438 .collect();
439
440 Ok(WorkflowData {
441 workflow_id: workflow_id.to_string(),
442 status,
443 current_step,
444 steps,
445 output,
446 error,
447 })
448 }
449
450 async fn execute_query(
452 &self,
453 query_name: &str,
454 args: &serde_json::Value,
455 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
456 match self.registry.get(query_name) {
457 Some(FunctionEntry::Query { handler, .. }) => {
458 let ctx = forge_core::function::QueryContext::new(
459 self.db_pool.clone(),
460 forge_core::function::AuthContext::unauthenticated(),
461 forge_core::function::RequestMetadata::new(),
462 );
463
464 let normalized_args = match args {
466 v if v.is_object() && v.as_object().unwrap().is_empty() => {
467 serde_json::Value::Null
468 }
469 v => v.clone(),
470 };
471
472 let data = handler(&ctx, normalized_args).await?;
473
474 let mut read_set = ReadSet::new();
477 let table_name = Self::extract_table_name(query_name);
478 read_set.add_table(&table_name);
479
480 Ok((data, read_set))
481 }
482 Some(_) => Err(forge_core::ForgeError::Validation(format!(
483 "'{}' is not a query",
484 query_name
485 ))),
486 None => Err(forge_core::ForgeError::Validation(format!(
487 "Query '{}' not found",
488 query_name
489 ))),
490 }
491 }
492
493 fn compute_hash(data: &serde_json::Value) -> String {
495 use std::collections::hash_map::DefaultHasher;
496 use std::hash::{Hash, Hasher};
497
498 let json = serde_json::to_string(data).unwrap_or_default();
499 let mut hasher = DefaultHasher::new();
500 json.hash(&mut hasher);
501 format!("{:x}", hasher.finish())
502 }
503
504 pub async fn start(&self) -> forge_core::Result<()> {
506 let listener = self.change_listener.clone();
507 let invalidation_engine = self.invalidation_engine.clone();
508 let active_subscriptions = self.active_subscriptions.clone();
509 let job_subscriptions = self.job_subscriptions.clone();
510 let workflow_subscriptions = self.workflow_subscriptions.clone();
511 let ws_server = self.ws_server.clone();
512 let registry = self.registry.clone();
513 let db_pool = self.db_pool.clone();
514 let mut shutdown_rx = self.shutdown_tx.subscribe();
515
516 let listener_clone = listener.clone();
518 let listener_handle = tokio::spawn(async move {
519 if let Err(e) = listener_clone.run().await {
520 tracing::error!("Change listener error: {}", e);
521 }
522 });
523
524 let mut change_rx = listener.subscribe();
526
527 tokio::spawn(async move {
529 tracing::info!("Reactor started, listening for changes");
530
531 loop {
532 tokio::select! {
533 result = change_rx.recv() => {
535 match result {
536 Ok(change) => {
537 Self::handle_change(
538 &change,
539 &invalidation_engine,
540 &active_subscriptions,
541 &job_subscriptions,
542 &workflow_subscriptions,
543 &ws_server,
544 ®istry,
545 &db_pool,
546 ).await;
547 }
548 Err(broadcast::error::RecvError::Lagged(n)) => {
549 tracing::warn!("Reactor lagged by {} messages", n);
550 }
551 Err(broadcast::error::RecvError::Closed) => {
552 tracing::info!("Change channel closed");
553 break;
554 }
555 }
556 }
557 _ = shutdown_rx.recv() => {
559 tracing::info!("Reactor shutting down");
560 break;
561 }
562 }
563 }
564
565 listener_handle.abort();
566 });
567
568 Ok(())
569 }
570
571 #[allow(clippy::too_many_arguments)]
573 async fn handle_change(
574 change: &Change,
575 invalidation_engine: &Arc<InvalidationEngine>,
576 active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
577 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
578 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
579 ws_server: &Arc<WebSocketServer>,
580 registry: &FunctionRegistry,
581 db_pool: &sqlx::PgPool,
582 ) {
583 tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
584
585 match change.table.as_str() {
587 "forge_jobs" => {
588 if let Some(job_id) = change.row_id {
589 Self::handle_job_change(job_id, job_subscriptions, ws_server, db_pool).await;
590 }
591 return; }
593 "forge_workflow_runs" => {
594 if let Some(workflow_id) = change.row_id {
595 Self::handle_workflow_change(
596 workflow_id,
597 workflow_subscriptions,
598 ws_server,
599 db_pool,
600 )
601 .await;
602 }
603 return; }
605 "forge_workflow_steps" => {
606 if let Some(step_id) = change.row_id {
608 Self::handle_workflow_step_change(
609 step_id,
610 workflow_subscriptions,
611 ws_server,
612 db_pool,
613 )
614 .await;
615 }
616 return; }
618 _ => {}
619 }
620
621 invalidation_engine.process_change(change.clone()).await;
623
624 let invalidated = invalidation_engine.flush_all().await;
628
629 if invalidated.is_empty() {
630 return;
631 }
632
633 tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
634
635 let subs_to_process: Vec<_> = {
637 let subscriptions = active_subscriptions.read().await;
638 invalidated
639 .iter()
640 .filter_map(|sub_id| {
641 subscriptions.get(sub_id).map(|active| {
642 (
643 *sub_id,
644 active.session_id,
645 active.query_name.clone(),
646 active.args.clone(),
647 active.last_result_hash.clone(),
648 )
649 })
650 })
651 .collect()
652 };
653
654 let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
656
657 for (sub_id, session_id, query_name, args, last_hash) in subs_to_process {
659 match Self::execute_query_static(registry, db_pool, &query_name, &args).await {
661 Ok((new_data, _read_set)) => {
662 let new_hash = Self::compute_hash(&new_data);
663
664 if last_hash.as_ref() != Some(&new_hash) {
666 let message = WebSocketMessage::Data {
668 subscription_id: sub_id,
669 data: new_data,
670 };
671
672 if let Err(e) = ws_server.send_to_session(session_id, message).await {
673 tracing::warn!(?sub_id, "Failed to send update: {}", e);
674 } else {
675 tracing::debug!(?sub_id, "Pushed update to client");
676 updates.push((sub_id, new_hash));
678 }
679 }
680 }
681 Err(e) => {
682 tracing::error!(?sub_id, "Failed to re-execute query: {}", e);
683 }
684 }
685 }
686
687 if !updates.is_empty() {
689 let mut subscriptions = active_subscriptions.write().await;
690 for (sub_id, new_hash) in updates {
691 if let Some(active) = subscriptions.get_mut(&sub_id) {
692 active.last_result_hash = Some(new_hash);
693 }
694 }
695 }
696 }
697
698 async fn handle_job_change(
700 job_id: Uuid,
701 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
702 ws_server: &Arc<WebSocketServer>,
703 db_pool: &sqlx::PgPool,
704 ) {
705 let subs = job_subscriptions.read().await;
706 let subscribers = match subs.get(&job_id) {
707 Some(s) if !s.is_empty() => s.clone(),
708 _ => return, };
710 drop(subs); let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
714 Ok(data) => data,
715 Err(e) => {
716 tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
717 return;
718 }
719 };
720
721 for sub in subscribers {
723 let message = WebSocketMessage::JobUpdate {
724 client_sub_id: sub.client_sub_id.clone(),
725 job: job_data.clone(),
726 };
727
728 if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
729 tracing::debug!(
731 %job_id,
732 client_id = %sub.client_sub_id,
733 "Failed to send job update (session likely disconnected): {}",
734 e
735 );
736 } else {
737 tracing::debug!(
738 %job_id,
739 client_id = %sub.client_sub_id,
740 "Pushed job update to client"
741 );
742 }
743 }
744 }
745
746 async fn handle_workflow_change(
748 workflow_id: Uuid,
749 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
750 ws_server: &Arc<WebSocketServer>,
751 db_pool: &sqlx::PgPool,
752 ) {
753 let subs = workflow_subscriptions.read().await;
754 let subscribers = match subs.get(&workflow_id) {
755 Some(s) if !s.is_empty() => s.clone(),
756 _ => return, };
758 drop(subs); let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
762 Ok(data) => data,
763 Err(e) => {
764 tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
765 return;
766 }
767 };
768
769 for sub in subscribers {
771 let message = WebSocketMessage::WorkflowUpdate {
772 client_sub_id: sub.client_sub_id.clone(),
773 workflow: workflow_data.clone(),
774 };
775
776 if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
777 tracing::debug!(
779 %workflow_id,
780 client_id = %sub.client_sub_id,
781 "Failed to send workflow update (session likely disconnected): {}",
782 e
783 );
784 } else {
785 tracing::debug!(
786 %workflow_id,
787 client_id = %sub.client_sub_id,
788 "Pushed workflow update to client"
789 );
790 }
791 }
792 }
793
794 async fn handle_workflow_step_change(
796 step_id: Uuid,
797 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
798 ws_server: &Arc<WebSocketServer>,
799 db_pool: &sqlx::PgPool,
800 ) {
801 let workflow_id: Option<Uuid> =
803 sqlx::query_scalar("SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1")
804 .bind(step_id)
805 .fetch_optional(db_pool)
806 .await
807 .ok()
808 .flatten();
809
810 if let Some(wf_id) = workflow_id {
811 Self::handle_workflow_change(wf_id, workflow_subscriptions, ws_server, db_pool).await;
813 }
814 }
815
816 #[allow(clippy::type_complexity)]
818 async fn fetch_job_data_static(
819 job_id: Uuid,
820 db_pool: &sqlx::PgPool,
821 ) -> forge_core::Result<JobData> {
822 let row: Option<(
823 String,
824 Option<i32>,
825 Option<String>,
826 Option<serde_json::Value>,
827 Option<String>,
828 )> = sqlx::query_as(
829 r#"
830 SELECT status, progress_percent, progress_message, output, last_error
831 FROM forge_jobs WHERE id = $1
832 "#,
833 )
834 .bind(job_id)
835 .fetch_optional(db_pool)
836 .await
837 .map_err(forge_core::ForgeError::Sql)?;
838
839 match row {
840 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
841 job_id: job_id.to_string(),
842 status,
843 progress_percent,
844 progress_message,
845 output,
846 error,
847 }),
848 None => Err(forge_core::ForgeError::NotFound(format!(
849 "Job {} not found",
850 job_id
851 ))),
852 }
853 }
854
855 #[allow(clippy::type_complexity)]
857 async fn fetch_workflow_data_static(
858 workflow_id: Uuid,
859 db_pool: &sqlx::PgPool,
860 ) -> forge_core::Result<WorkflowData> {
861 let row: Option<(
862 String,
863 Option<String>,
864 Option<serde_json::Value>,
865 Option<String>,
866 )> = sqlx::query_as(
867 r#"
868 SELECT status, current_step, output, error
869 FROM forge_workflow_runs WHERE id = $1
870 "#,
871 )
872 .bind(workflow_id)
873 .fetch_optional(db_pool)
874 .await
875 .map_err(forge_core::ForgeError::Sql)?;
876
877 let (status, current_step, output, error) = match row {
878 Some(r) => r,
879 None => {
880 return Err(forge_core::ForgeError::NotFound(format!(
881 "Workflow {} not found",
882 workflow_id
883 )));
884 }
885 };
886
887 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
888 r#"
889 SELECT step_name, status, error
890 FROM forge_workflow_steps
891 WHERE workflow_run_id = $1
892 ORDER BY started_at ASC NULLS LAST
893 "#,
894 )
895 .bind(workflow_id)
896 .fetch_all(db_pool)
897 .await
898 .map_err(forge_core::ForgeError::Sql)?;
899
900 let steps = step_rows
901 .into_iter()
902 .map(|(name, status, error)| WorkflowStepData {
903 name,
904 status,
905 error,
906 })
907 .collect();
908
909 Ok(WorkflowData {
910 workflow_id: workflow_id.to_string(),
911 status,
912 current_step,
913 steps,
914 output,
915 error,
916 })
917 }
918
919 async fn execute_query_static(
921 registry: &FunctionRegistry,
922 db_pool: &sqlx::PgPool,
923 query_name: &str,
924 args: &serde_json::Value,
925 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
926 match registry.get(query_name) {
927 Some(FunctionEntry::Query { handler, .. }) => {
928 let ctx = forge_core::function::QueryContext::new(
929 db_pool.clone(),
930 forge_core::function::AuthContext::unauthenticated(),
931 forge_core::function::RequestMetadata::new(),
932 );
933
934 let normalized_args = match args {
935 v if v.is_object() && v.as_object().unwrap().is_empty() => {
936 serde_json::Value::Null
937 }
938 v => v.clone(),
939 };
940
941 let data = handler(&ctx, normalized_args).await?;
942
943 let mut read_set = ReadSet::new();
945 let table_name = Self::extract_table_name(query_name);
946 read_set.add_table(&table_name);
947
948 Ok((data, read_set))
949 }
950 _ => Err(forge_core::ForgeError::Validation(format!(
951 "Query '{}' not found or not a query",
952 query_name
953 ))),
954 }
955 }
956
957 fn extract_table_name(query_name: &str) -> String {
959 if let Some(rest) = query_name.strip_prefix("get_") {
960 rest.to_string()
961 } else if let Some(rest) = query_name.strip_prefix("list_") {
962 rest.to_string()
963 } else if let Some(rest) = query_name.strip_prefix("find_") {
964 rest.to_string()
965 } else if let Some(rest) = query_name.strip_prefix("fetch_") {
966 rest.to_string()
967 } else {
968 query_name.to_string()
969 }
970 }
971
972 pub fn stop(&self) {
974 let _ = self.shutdown_tx.send(());
975 self.change_listener.stop();
976 }
977
978 pub async fn stats(&self) -> ReactorStats {
980 let ws_stats = self.ws_server.stats().await;
981 let inv_stats = self.invalidation_engine.stats().await;
982
983 ReactorStats {
984 connections: ws_stats.connections,
985 subscriptions: ws_stats.subscriptions,
986 pending_invalidations: inv_stats.pending_subscriptions,
987 listener_running: self.change_listener.is_running(),
988 }
989 }
990}
991
992#[derive(Debug, Clone)]
994pub struct ReactorStats {
995 pub connections: usize,
996 pub subscriptions: usize,
997 pub pending_invalidations: usize,
998 pub listener_running: bool,
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004
1005 #[test]
1006 fn test_reactor_config_default() {
1007 let config = ReactorConfig::default();
1008 assert_eq!(config.listener.channel, "forge_changes");
1009 assert_eq!(config.invalidation.debounce_ms, 50);
1010 }
1011
1012 #[test]
1013 fn test_compute_hash() {
1014 let data1 = serde_json::json!({"name": "test"});
1015 let data2 = serde_json::json!({"name": "test"});
1016 let data3 = serde_json::json!({"name": "different"});
1017
1018 let hash1 = Reactor::compute_hash(&data1);
1019 let hash2 = Reactor::compute_hash(&data2);
1020 let hash3 = Reactor::compute_hash(&data3);
1021
1022 assert_eq!(hash1, hash2);
1023 assert_ne!(hash1, hash3);
1024 }
1025}