1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::websocket::{WebSocketConfig, WebSocketMessage, WebSocketServer};
14use crate::function::{FunctionEntry, FunctionRegistry};
15use crate::gateway::websocket::{JobData, WorkflowData, WorkflowStepData};
16
17#[derive(Debug, Clone, Default)]
19pub struct ReactorConfig {
20 pub listener: ListenerConfig,
21 pub invalidation: InvalidationConfig,
22 pub websocket: WebSocketConfig,
23}
24
25#[derive(Debug, Clone)]
27pub struct ActiveSubscription {
28 #[allow(dead_code)]
29 pub subscription_id: SubscriptionId,
30 pub session_id: SessionId,
31 #[allow(dead_code)]
32 pub client_sub_id: String,
33 pub query_name: String,
34 pub args: serde_json::Value,
35 pub last_result_hash: Option<String>,
36 #[allow(dead_code)]
37 pub read_set: ReadSet,
38 pub auth_context: forge_core::function::AuthContext,
40}
41
42#[derive(Debug, Clone)]
44pub struct JobSubscription {
45 #[allow(dead_code)]
46 pub subscription_id: SubscriptionId,
47 pub session_id: SessionId,
48 pub client_sub_id: String,
49 #[allow(dead_code)]
50 pub job_id: Uuid, }
52
53#[derive(Debug, Clone)]
55pub struct WorkflowSubscription {
56 #[allow(dead_code)]
57 pub subscription_id: SubscriptionId,
58 pub session_id: SessionId,
59 pub client_sub_id: String,
60 #[allow(dead_code)]
61 pub workflow_id: Uuid, }
63
64pub struct Reactor {
67 #[allow(dead_code)]
68 node_id: NodeId,
69 db_pool: sqlx::PgPool,
70 registry: FunctionRegistry,
71 subscription_manager: Arc<SubscriptionManager>,
72 ws_server: Arc<WebSocketServer>,
73 change_listener: Arc<ChangeListener>,
74 invalidation_engine: Arc<InvalidationEngine>,
75 active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
77 job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
79 workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
81 shutdown_tx: broadcast::Sender<()>,
83}
84
85impl Reactor {
86 pub fn new(
88 node_id: NodeId,
89 db_pool: sqlx::PgPool,
90 registry: FunctionRegistry,
91 config: ReactorConfig,
92 ) -> Self {
93 let subscription_manager = Arc::new(SubscriptionManager::new(
94 config.websocket.max_subscriptions_per_connection,
95 ));
96 let ws_server = Arc::new(WebSocketServer::new(node_id, config.websocket));
97 let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
98 let invalidation_engine = Arc::new(InvalidationEngine::new(
99 subscription_manager.clone(),
100 config.invalidation,
101 ));
102 let (shutdown_tx, _) = broadcast::channel(1);
103
104 Self {
105 node_id,
106 db_pool,
107 registry,
108 subscription_manager,
109 ws_server,
110 change_listener,
111 invalidation_engine,
112 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
113 job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
114 workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
115 shutdown_tx,
116 }
117 }
118
119 pub fn node_id(&self) -> NodeId {
121 self.node_id
122 }
123
124 pub fn ws_server(&self) -> Arc<WebSocketServer> {
126 self.ws_server.clone()
127 }
128
129 pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
131 self.subscription_manager.clone()
132 }
133
134 pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
136 self.shutdown_tx.subscribe()
137 }
138
139 pub async fn register_session(
141 &self,
142 session_id: SessionId,
143 sender: mpsc::Sender<WebSocketMessage>,
144 ) {
145 self.ws_server.register_connection(session_id, sender).await;
146 tracing::debug!(?session_id, "Session registered with reactor");
147 }
148
149 pub async fn remove_session(&self, session_id: SessionId) {
151 if let Some(subscription_ids) = self.ws_server.remove_connection(session_id).await {
152 for sub_id in subscription_ids {
154 self.subscription_manager.remove_subscription(sub_id).await;
155 self.active_subscriptions.write().await.remove(&sub_id);
156 }
157 }
158
159 {
161 let mut job_subs = self.job_subscriptions.write().await;
162 for subscribers in job_subs.values_mut() {
163 subscribers.retain(|s| s.session_id != session_id);
164 }
165 job_subs.retain(|_, v| !v.is_empty());
167 }
168
169 {
171 let mut workflow_subs = self.workflow_subscriptions.write().await;
172 for subscribers in workflow_subs.values_mut() {
173 subscribers.retain(|s| s.session_id != session_id);
174 }
175 workflow_subs.retain(|_, v| !v.is_empty());
177 }
178
179 tracing::debug!(?session_id, "Session removed from reactor");
180 }
181
182 pub async fn subscribe(
184 &self,
185 session_id: SessionId,
186 client_sub_id: String,
187 query_name: String,
188 args: serde_json::Value,
189 auth_context: forge_core::function::AuthContext,
190 ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
191 let sub_info = self
192 .subscription_manager
193 .create_subscription(session_id, &query_name, args.clone())
194 .await?;
195
196 let subscription_id = sub_info.id;
197
198 self.ws_server
199 .add_subscription(session_id, subscription_id)
200 .await?;
201
202 let (data, read_set) = self
203 .execute_query(&query_name, &args, &auth_context)
204 .await?;
205
206 let result_hash = Self::compute_hash(&data);
207
208 let tables: Vec<_> = read_set.tables.iter().collect();
209 tracing::debug!(
210 ?subscription_id,
211 query_name = %query_name,
212 read_set_tables = ?tables,
213 "Updating subscription with read set"
214 );
215
216 self.subscription_manager
217 .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
218 .await;
219
220 let active = ActiveSubscription {
221 subscription_id,
222 session_id,
223 client_sub_id,
224 query_name,
225 args,
226 last_result_hash: Some(result_hash),
227 read_set,
228 auth_context,
229 };
230 self.active_subscriptions
231 .write()
232 .await
233 .insert(subscription_id, active);
234
235 tracing::debug!(?subscription_id, "Subscription created");
236
237 Ok((subscription_id, data))
238 }
239
240 pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
242 self.ws_server.remove_subscription(subscription_id).await;
243 self.subscription_manager
244 .remove_subscription(subscription_id)
245 .await;
246 self.active_subscriptions
247 .write()
248 .await
249 .remove(&subscription_id);
250 tracing::debug!(?subscription_id, "Subscription removed");
251 }
252
253 pub async fn subscribe_job(
255 &self,
256 session_id: SessionId,
257 client_sub_id: String,
258 job_id: Uuid, ) -> forge_core::Result<JobData> {
260 let subscription_id = SubscriptionId::new();
261
262 let job_data = self.fetch_job_data(job_id).await?;
264
265 let subscription = JobSubscription {
267 subscription_id,
268 session_id,
269 client_sub_id: client_sub_id.clone(),
270 job_id,
271 };
272
273 let mut subs = self.job_subscriptions.write().await;
274 subs.entry(job_id).or_default().push(subscription);
275
276 tracing::debug!(
277 ?subscription_id,
278 client_id = %client_sub_id,
279 %job_id,
280 "Job subscription created"
281 );
282
283 Ok(job_data)
284 }
285
286 pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
288 let mut subs = self.job_subscriptions.write().await;
289
290 for subscribers in subs.values_mut() {
292 subscribers
293 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
294 }
295
296 subs.retain(|_, v| !v.is_empty());
298
299 tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
300 }
301
302 pub async fn subscribe_workflow(
304 &self,
305 session_id: SessionId,
306 client_sub_id: String,
307 workflow_id: Uuid, ) -> forge_core::Result<WorkflowData> {
309 let subscription_id = SubscriptionId::new();
310
311 let workflow_data = self.fetch_workflow_data(workflow_id).await?;
313
314 let subscription = WorkflowSubscription {
316 subscription_id,
317 session_id,
318 client_sub_id: client_sub_id.clone(),
319 workflow_id,
320 };
321
322 let mut subs = self.workflow_subscriptions.write().await;
323 subs.entry(workflow_id).or_default().push(subscription);
324
325 tracing::debug!(
326 ?subscription_id,
327 client_id = %client_sub_id,
328 %workflow_id,
329 "Workflow subscription created"
330 );
331
332 Ok(workflow_data)
333 }
334
335 pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
337 let mut subs = self.workflow_subscriptions.write().await;
338
339 for subscribers in subs.values_mut() {
341 subscribers
342 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
343 }
344
345 subs.retain(|_, v| !v.is_empty());
347
348 tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
349 }
350
351 #[allow(clippy::type_complexity)]
353 async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
354 let row: Option<(
355 String,
356 Option<i32>,
357 Option<String>,
358 Option<serde_json::Value>,
359 Option<String>,
360 )> = sqlx::query_as(
361 r#"
362 SELECT status, progress_percent, progress_message, output, last_error
363 FROM forge_jobs WHERE id = $1
364 "#,
365 )
366 .bind(job_id)
367 .fetch_optional(&self.db_pool)
368 .await
369 .map_err(forge_core::ForgeError::Sql)?;
370
371 match row {
372 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
373 job_id: job_id.to_string(),
374 status,
375 progress_percent,
376 progress_message,
377 output,
378 error,
379 }),
380 None => Err(forge_core::ForgeError::NotFound(format!(
381 "Job {} not found",
382 job_id
383 ))),
384 }
385 }
386
387 #[allow(clippy::type_complexity)]
389 async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
390 let row: Option<(
392 String,
393 Option<String>,
394 Option<serde_json::Value>,
395 Option<String>,
396 )> = sqlx::query_as(
397 r#"
398 SELECT status, current_step, output, error
399 FROM forge_workflow_runs WHERE id = $1
400 "#,
401 )
402 .bind(workflow_id)
403 .fetch_optional(&self.db_pool)
404 .await
405 .map_err(forge_core::ForgeError::Sql)?;
406
407 let (status, current_step, output, error) = match row {
408 Some(r) => r,
409 None => {
410 return Err(forge_core::ForgeError::NotFound(format!(
411 "Workflow {} not found",
412 workflow_id
413 )));
414 }
415 };
416
417 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
419 r#"
420 SELECT step_name, status, error
421 FROM forge_workflow_steps
422 WHERE workflow_run_id = $1
423 ORDER BY started_at ASC NULLS LAST
424 "#,
425 )
426 .bind(workflow_id)
427 .fetch_all(&self.db_pool)
428 .await
429 .map_err(forge_core::ForgeError::Sql)?;
430
431 let steps = step_rows
432 .into_iter()
433 .map(|(name, status, error)| WorkflowStepData {
434 name,
435 status,
436 error,
437 })
438 .collect();
439
440 Ok(WorkflowData {
441 workflow_id: workflow_id.to_string(),
442 status,
443 current_step,
444 steps,
445 output,
446 error,
447 })
448 }
449
450 async fn execute_query(
452 &self,
453 query_name: &str,
454 args: &serde_json::Value,
455 auth_context: &forge_core::function::AuthContext,
456 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
457 match self.registry.get(query_name) {
458 Some(FunctionEntry::Query { info, handler }) => {
459 let ctx = forge_core::function::QueryContext::new(
460 self.db_pool.clone(),
461 auth_context.clone(),
462 forge_core::function::RequestMetadata::new(),
463 );
464
465 let normalized_args = match args {
467 v if v.is_object() && v.as_object().unwrap().is_empty() => {
468 serde_json::Value::Null
469 }
470 v => v.clone(),
471 };
472
473 let data = handler(&ctx, normalized_args).await?;
474
475 let mut read_set = ReadSet::new();
477
478 if info.table_dependencies.is_empty() {
479 let table_name = Self::extract_table_name(query_name);
482 read_set.add_table(&table_name);
483 tracing::debug!(
484 query = %query_name,
485 fallback_table = %table_name,
486 "No compile-time table dependencies found, using naming convention fallback"
487 );
488 } else {
489 for table in info.table_dependencies {
491 read_set.add_table(*table);
492 }
493 tracing::debug!(
494 query = %query_name,
495 tables = ?info.table_dependencies,
496 "Using compile-time table dependencies"
497 );
498 }
499
500 Ok((data, read_set))
501 }
502 Some(_) => Err(forge_core::ForgeError::Validation(format!(
503 "'{}' is not a query",
504 query_name
505 ))),
506 None => Err(forge_core::ForgeError::Validation(format!(
507 "Query '{}' not found",
508 query_name
509 ))),
510 }
511 }
512
513 fn compute_hash(data: &serde_json::Value) -> String {
515 use std::collections::hash_map::DefaultHasher;
516 use std::hash::{Hash, Hasher};
517
518 let json = serde_json::to_string(data).unwrap_or_default();
519 let mut hasher = DefaultHasher::new();
520 json.hash(&mut hasher);
521 format!("{:x}", hasher.finish())
522 }
523
524 pub async fn start(&self) -> forge_core::Result<()> {
526 let listener = self.change_listener.clone();
527 let invalidation_engine = self.invalidation_engine.clone();
528 let active_subscriptions = self.active_subscriptions.clone();
529 let job_subscriptions = self.job_subscriptions.clone();
530 let workflow_subscriptions = self.workflow_subscriptions.clone();
531 let ws_server = self.ws_server.clone();
532 let registry = self.registry.clone();
533 let db_pool = self.db_pool.clone();
534 let mut shutdown_rx = self.shutdown_tx.subscribe();
535
536 let listener_clone = listener.clone();
538 let listener_handle = tokio::spawn(async move {
539 if let Err(e) = listener_clone.run().await {
540 tracing::error!("Change listener error: {}", e);
541 }
542 });
543
544 let mut change_rx = listener.subscribe();
546
547 tokio::spawn(async move {
549 tracing::info!("Reactor started, listening for changes");
550
551 loop {
552 tokio::select! {
553 result = change_rx.recv() => {
555 match result {
556 Ok(change) => {
557 Self::handle_change(
558 &change,
559 &invalidation_engine,
560 &active_subscriptions,
561 &job_subscriptions,
562 &workflow_subscriptions,
563 &ws_server,
564 ®istry,
565 &db_pool,
566 ).await;
567 }
568 Err(broadcast::error::RecvError::Lagged(n)) => {
569 tracing::warn!("Reactor lagged by {} messages", n);
570 }
571 Err(broadcast::error::RecvError::Closed) => {
572 tracing::info!("Change channel closed");
573 break;
574 }
575 }
576 }
577 _ = shutdown_rx.recv() => {
579 tracing::info!("Reactor shutting down");
580 break;
581 }
582 }
583 }
584
585 listener_handle.abort();
586 });
587
588 Ok(())
589 }
590
591 #[allow(clippy::too_many_arguments)]
593 async fn handle_change(
594 change: &Change,
595 invalidation_engine: &Arc<InvalidationEngine>,
596 active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
597 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
598 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
599 ws_server: &Arc<WebSocketServer>,
600 registry: &FunctionRegistry,
601 db_pool: &sqlx::PgPool,
602 ) {
603 tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
604
605 match change.table.as_str() {
607 "forge_jobs" => {
608 if let Some(job_id) = change.row_id {
609 Self::handle_job_change(job_id, job_subscriptions, ws_server, db_pool).await;
610 }
611 return; }
613 "forge_workflow_runs" => {
614 if let Some(workflow_id) = change.row_id {
615 Self::handle_workflow_change(
616 workflow_id,
617 workflow_subscriptions,
618 ws_server,
619 db_pool,
620 )
621 .await;
622 }
623 return; }
625 "forge_workflow_steps" => {
626 if let Some(step_id) = change.row_id {
628 Self::handle_workflow_step_change(
629 step_id,
630 workflow_subscriptions,
631 ws_server,
632 db_pool,
633 )
634 .await;
635 }
636 return; }
638 _ => {}
639 }
640
641 invalidation_engine.process_change(change.clone()).await;
643
644 let invalidated = invalidation_engine.flush_all().await;
648
649 if invalidated.is_empty() {
650 return;
651 }
652
653 tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
654
655 let subs_to_process: Vec<_> = {
657 let subscriptions = active_subscriptions.read().await;
658 invalidated
659 .iter()
660 .filter_map(|sub_id| {
661 subscriptions.get(sub_id).map(|active| {
662 (
663 *sub_id,
664 active.session_id,
665 active.query_name.clone(),
666 active.args.clone(),
667 active.last_result_hash.clone(),
668 active.auth_context.clone(),
669 )
670 })
671 })
672 .collect()
673 };
674
675 let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
677
678 for (sub_id, session_id, query_name, args, last_hash, auth_context) in subs_to_process {
680 match Self::execute_query_static(registry, db_pool, &query_name, &args, &auth_context)
682 .await
683 {
684 Ok((new_data, _read_set)) => {
685 let new_hash = Self::compute_hash(&new_data);
686
687 if last_hash.as_ref() != Some(&new_hash) {
689 let message = WebSocketMessage::Data {
691 subscription_id: sub_id,
692 data: new_data,
693 };
694
695 if let Err(e) = ws_server.send_to_session(session_id, message).await {
696 tracing::warn!(?sub_id, "Failed to send update: {}", e);
697 } else {
698 tracing::debug!(?sub_id, "Pushed update to client");
699 updates.push((sub_id, new_hash));
701 }
702 }
703 }
704 Err(e) => {
705 tracing::error!(?sub_id, "Failed to re-execute query: {}", e);
706 }
707 }
708 }
709
710 if !updates.is_empty() {
712 let mut subscriptions = active_subscriptions.write().await;
713 for (sub_id, new_hash) in updates {
714 if let Some(active) = subscriptions.get_mut(&sub_id) {
715 active.last_result_hash = Some(new_hash);
716 }
717 }
718 }
719 }
720
721 async fn handle_job_change(
723 job_id: Uuid,
724 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
725 ws_server: &Arc<WebSocketServer>,
726 db_pool: &sqlx::PgPool,
727 ) {
728 let subs = job_subscriptions.read().await;
729 let subscribers = match subs.get(&job_id) {
730 Some(s) if !s.is_empty() => s.clone(),
731 _ => return, };
733 drop(subs); let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
737 Ok(data) => data,
738 Err(e) => {
739 tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
740 return;
741 }
742 };
743
744 for sub in subscribers {
746 let message = WebSocketMessage::JobUpdate {
747 client_sub_id: sub.client_sub_id.clone(),
748 job: job_data.clone(),
749 };
750
751 if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
752 tracing::debug!(
754 %job_id,
755 client_id = %sub.client_sub_id,
756 "Failed to send job update (session likely disconnected): {}",
757 e
758 );
759 } else {
760 tracing::debug!(
761 %job_id,
762 client_id = %sub.client_sub_id,
763 "Pushed job update to client"
764 );
765 }
766 }
767 }
768
769 async fn handle_workflow_change(
771 workflow_id: Uuid,
772 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
773 ws_server: &Arc<WebSocketServer>,
774 db_pool: &sqlx::PgPool,
775 ) {
776 let subs = workflow_subscriptions.read().await;
777 let subscribers = match subs.get(&workflow_id) {
778 Some(s) if !s.is_empty() => s.clone(),
779 _ => return, };
781 drop(subs); let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
785 Ok(data) => data,
786 Err(e) => {
787 tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
788 return;
789 }
790 };
791
792 for sub in subscribers {
794 let message = WebSocketMessage::WorkflowUpdate {
795 client_sub_id: sub.client_sub_id.clone(),
796 workflow: workflow_data.clone(),
797 };
798
799 if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
800 tracing::debug!(
802 %workflow_id,
803 client_id = %sub.client_sub_id,
804 "Failed to send workflow update (session likely disconnected): {}",
805 e
806 );
807 } else {
808 tracing::debug!(
809 %workflow_id,
810 client_id = %sub.client_sub_id,
811 "Pushed workflow update to client"
812 );
813 }
814 }
815 }
816
817 async fn handle_workflow_step_change(
819 step_id: Uuid,
820 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
821 ws_server: &Arc<WebSocketServer>,
822 db_pool: &sqlx::PgPool,
823 ) {
824 let workflow_id: Option<Uuid> =
826 sqlx::query_scalar("SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1")
827 .bind(step_id)
828 .fetch_optional(db_pool)
829 .await
830 .ok()
831 .flatten();
832
833 if let Some(wf_id) = workflow_id {
834 Self::handle_workflow_change(wf_id, workflow_subscriptions, ws_server, db_pool).await;
836 }
837 }
838
839 #[allow(clippy::type_complexity)]
841 async fn fetch_job_data_static(
842 job_id: Uuid,
843 db_pool: &sqlx::PgPool,
844 ) -> forge_core::Result<JobData> {
845 let row: Option<(
846 String,
847 Option<i32>,
848 Option<String>,
849 Option<serde_json::Value>,
850 Option<String>,
851 )> = sqlx::query_as(
852 r#"
853 SELECT status, progress_percent, progress_message, output, last_error
854 FROM forge_jobs WHERE id = $1
855 "#,
856 )
857 .bind(job_id)
858 .fetch_optional(db_pool)
859 .await
860 .map_err(forge_core::ForgeError::Sql)?;
861
862 match row {
863 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
864 job_id: job_id.to_string(),
865 status,
866 progress_percent,
867 progress_message,
868 output,
869 error,
870 }),
871 None => Err(forge_core::ForgeError::NotFound(format!(
872 "Job {} not found",
873 job_id
874 ))),
875 }
876 }
877
878 #[allow(clippy::type_complexity)]
880 async fn fetch_workflow_data_static(
881 workflow_id: Uuid,
882 db_pool: &sqlx::PgPool,
883 ) -> forge_core::Result<WorkflowData> {
884 let row: Option<(
885 String,
886 Option<String>,
887 Option<serde_json::Value>,
888 Option<String>,
889 )> = sqlx::query_as(
890 r#"
891 SELECT status, current_step, output, error
892 FROM forge_workflow_runs WHERE id = $1
893 "#,
894 )
895 .bind(workflow_id)
896 .fetch_optional(db_pool)
897 .await
898 .map_err(forge_core::ForgeError::Sql)?;
899
900 let (status, current_step, output, error) = match row {
901 Some(r) => r,
902 None => {
903 return Err(forge_core::ForgeError::NotFound(format!(
904 "Workflow {} not found",
905 workflow_id
906 )));
907 }
908 };
909
910 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
911 r#"
912 SELECT step_name, status, error
913 FROM forge_workflow_steps
914 WHERE workflow_run_id = $1
915 ORDER BY started_at ASC NULLS LAST
916 "#,
917 )
918 .bind(workflow_id)
919 .fetch_all(db_pool)
920 .await
921 .map_err(forge_core::ForgeError::Sql)?;
922
923 let steps = step_rows
924 .into_iter()
925 .map(|(name, status, error)| WorkflowStepData {
926 name,
927 status,
928 error,
929 })
930 .collect();
931
932 Ok(WorkflowData {
933 workflow_id: workflow_id.to_string(),
934 status,
935 current_step,
936 steps,
937 output,
938 error,
939 })
940 }
941
942 async fn execute_query_static(
944 registry: &FunctionRegistry,
945 db_pool: &sqlx::PgPool,
946 query_name: &str,
947 args: &serde_json::Value,
948 auth_context: &forge_core::function::AuthContext,
949 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
950 match registry.get(query_name) {
951 Some(FunctionEntry::Query { info, handler }) => {
952 let ctx = forge_core::function::QueryContext::new(
953 db_pool.clone(),
954 auth_context.clone(),
955 forge_core::function::RequestMetadata::new(),
956 );
957
958 let normalized_args = match args {
959 v if v.is_object() && v.as_object().unwrap().is_empty() => {
960 serde_json::Value::Null
961 }
962 v => v.clone(),
963 };
964
965 let data = handler(&ctx, normalized_args).await?;
966
967 let mut read_set = ReadSet::new();
969
970 if info.table_dependencies.is_empty() {
971 let table_name = Self::extract_table_name(query_name);
973 read_set.add_table(&table_name);
974 tracing::debug!(
975 query = %query_name,
976 fallback_table = %table_name,
977 "No compile-time table dependencies found (static), using naming convention fallback"
978 );
979 } else {
980 for table in info.table_dependencies {
981 read_set.add_table(*table);
982 }
983 tracing::debug!(
984 query = %query_name,
985 tables = ?info.table_dependencies,
986 "Using compile-time table dependencies (static)"
987 );
988 }
989
990 Ok((data, read_set))
991 }
992 _ => Err(forge_core::ForgeError::Validation(format!(
993 "Query '{}' not found or not a query",
994 query_name
995 ))),
996 }
997 }
998
999 fn extract_table_name(query_name: &str) -> String {
1001 if let Some(rest) = query_name.strip_prefix("get_") {
1002 rest.to_string()
1003 } else if let Some(rest) = query_name.strip_prefix("list_") {
1004 rest.to_string()
1005 } else if let Some(rest) = query_name.strip_prefix("find_") {
1006 rest.to_string()
1007 } else if let Some(rest) = query_name.strip_prefix("fetch_") {
1008 rest.to_string()
1009 } else {
1010 query_name.to_string()
1011 }
1012 }
1013
1014 pub fn stop(&self) {
1016 let _ = self.shutdown_tx.send(());
1017 self.change_listener.stop();
1018 }
1019
1020 pub async fn stats(&self) -> ReactorStats {
1022 let ws_stats = self.ws_server.stats().await;
1023 let inv_stats = self.invalidation_engine.stats().await;
1024
1025 ReactorStats {
1026 connections: ws_stats.connections,
1027 subscriptions: ws_stats.subscriptions,
1028 pending_invalidations: inv_stats.pending_subscriptions,
1029 listener_running: self.change_listener.is_running(),
1030 }
1031 }
1032}
1033
1034#[derive(Debug, Clone)]
1036pub struct ReactorStats {
1037 pub connections: usize,
1038 pub subscriptions: usize,
1039 pub pending_invalidations: usize,
1040 pub listener_running: bool,
1041}
1042
1043#[cfg(test)]
1044mod tests {
1045 use super::*;
1046
1047 #[test]
1048 fn test_reactor_config_default() {
1049 let config = ReactorConfig::default();
1050 assert_eq!(config.listener.channel, "forge_changes");
1051 assert_eq!(config.invalidation.debounce_ms, 50);
1052 }
1053
1054 #[test]
1055 fn test_compute_hash() {
1056 let data1 = serde_json::json!({"name": "test"});
1057 let data2 = serde_json::json!({"name": "test"});
1058 let data3 = serde_json::json!({"name": "different"});
1059
1060 let hash1 = Reactor::compute_hash(&data1);
1061 let hash2 = Reactor::compute_hash(&data2);
1062 let hash3 = Reactor::compute_hash(&data3);
1063
1064 assert_eq!(hash1, hash2);
1065 assert_ne!(hash1, hash3);
1066 }
1067}