1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::message::{
14 JobData, RealtimeConfig, RealtimeMessage, SessionServer, WorkflowData, WorkflowStepData,
15};
16use crate::function::{FunctionEntry, FunctionRegistry};
17
18#[derive(Debug, Clone)]
19pub struct ReactorConfig {
20 pub listener: ListenerConfig,
21 pub invalidation: InvalidationConfig,
22 pub realtime: RealtimeConfig,
23 pub max_listener_restarts: u32,
24 pub listener_restart_delay_ms: u64,
26}
27
28impl Default for ReactorConfig {
29 fn default() -> Self {
30 Self {
31 listener: ListenerConfig::default(),
32 invalidation: InvalidationConfig::default(),
33 realtime: RealtimeConfig::default(),
34 max_listener_restarts: 5,
35 listener_restart_delay_ms: 1000,
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ActiveSubscription {
43 #[allow(dead_code)]
44 pub subscription_id: SubscriptionId,
45 pub session_id: SessionId,
46 #[allow(dead_code)]
47 pub client_sub_id: String,
48 pub query_name: String,
49 pub args: serde_json::Value,
50 pub last_result_hash: Option<String>,
51 #[allow(dead_code)]
52 pub read_set: ReadSet,
53 pub auth_context: forge_core::function::AuthContext,
55}
56
57#[derive(Debug, Clone)]
59pub struct JobSubscription {
60 #[allow(dead_code)]
61 pub subscription_id: SubscriptionId,
62 pub session_id: SessionId,
63 pub client_sub_id: String,
64 #[allow(dead_code)]
65 pub job_id: Uuid, }
67
68#[derive(Debug, Clone)]
70pub struct WorkflowSubscription {
71 #[allow(dead_code)]
72 pub subscription_id: SubscriptionId,
73 pub session_id: SessionId,
74 pub client_sub_id: String,
75 #[allow(dead_code)]
76 pub workflow_id: Uuid, }
78
79pub struct Reactor {
81 node_id: NodeId,
82 db_pool: sqlx::PgPool,
83 registry: FunctionRegistry,
84 subscription_manager: Arc<SubscriptionManager>,
85 session_server: Arc<SessionServer>,
86 change_listener: Arc<ChangeListener>,
87 invalidation_engine: Arc<InvalidationEngine>,
88 active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
90 job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
92 workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
94 shutdown_tx: broadcast::Sender<()>,
96 max_listener_restarts: u32,
98 listener_restart_delay_ms: u64,
99}
100
101impl Reactor {
102 pub fn new(
104 node_id: NodeId,
105 db_pool: sqlx::PgPool,
106 registry: FunctionRegistry,
107 config: ReactorConfig,
108 ) -> Self {
109 let subscription_manager = Arc::new(SubscriptionManager::new(
110 config.realtime.max_subscriptions_per_session,
111 ));
112 let session_server = Arc::new(SessionServer::new(node_id, config.realtime.clone()));
113 let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
114 let invalidation_engine = Arc::new(InvalidationEngine::new(
115 subscription_manager.clone(),
116 config.invalidation,
117 ));
118 let (shutdown_tx, _) = broadcast::channel(1);
119
120 Self {
121 node_id,
122 db_pool,
123 registry,
124 subscription_manager,
125 session_server,
126 change_listener,
127 invalidation_engine,
128 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
129 job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
130 workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
131 shutdown_tx,
132 max_listener_restarts: config.max_listener_restarts,
133 listener_restart_delay_ms: config.listener_restart_delay_ms,
134 }
135 }
136
137 pub fn node_id(&self) -> NodeId {
139 self.node_id
140 }
141
142 pub fn session_server(&self) -> Arc<SessionServer> {
144 self.session_server.clone()
145 }
146
147 pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
149 self.subscription_manager.clone()
150 }
151
152 pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
154 self.shutdown_tx.subscribe()
155 }
156
157 pub async fn register_session(
159 &self,
160 session_id: SessionId,
161 sender: mpsc::Sender<RealtimeMessage>,
162 ) {
163 self.session_server
164 .register_connection(session_id, sender)
165 .await;
166 tracing::debug!(?session_id, "Session registered with reactor");
167 }
168
169 pub async fn remove_session(&self, session_id: SessionId) {
171 if let Some(subscription_ids) = self.session_server.remove_connection(session_id).await {
172 for sub_id in subscription_ids {
174 self.subscription_manager.remove_subscription(sub_id).await;
175 self.active_subscriptions.write().await.remove(&sub_id);
176 }
177 }
178
179 {
181 let mut job_subs = self.job_subscriptions.write().await;
182 for subscribers in job_subs.values_mut() {
183 subscribers.retain(|s| s.session_id != session_id);
184 }
185 job_subs.retain(|_, v| !v.is_empty());
187 }
188
189 {
191 let mut workflow_subs = self.workflow_subscriptions.write().await;
192 for subscribers in workflow_subs.values_mut() {
193 subscribers.retain(|s| s.session_id != session_id);
194 }
195 workflow_subs.retain(|_, v| !v.is_empty());
197 }
198
199 tracing::debug!(?session_id, "Session removed from reactor");
200 }
201
202 pub async fn subscribe(
204 &self,
205 session_id: SessionId,
206 client_sub_id: String,
207 query_name: String,
208 args: serde_json::Value,
209 auth_context: forge_core::function::AuthContext,
210 ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
211 let sub_info = self
212 .subscription_manager
213 .create_subscription(session_id, &query_name, args.clone())
214 .await?;
215
216 let subscription_id = sub_info.id;
217
218 self.session_server
219 .add_subscription(session_id, subscription_id)
220 .await?;
221
222 let (data, read_set) = self
223 .execute_query(&query_name, &args, &auth_context)
224 .await?;
225
226 let result_hash = Self::compute_hash(&data);
227
228 let tables: Vec<_> = read_set.tables.iter().collect();
229 tracing::debug!(
230 ?subscription_id,
231 query_name = %query_name,
232 read_set_tables = ?tables,
233 "Updating subscription with read set"
234 );
235
236 self.subscription_manager
237 .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
238 .await;
239
240 let active = ActiveSubscription {
241 subscription_id,
242 session_id,
243 client_sub_id,
244 query_name,
245 args,
246 last_result_hash: Some(result_hash),
247 read_set,
248 auth_context,
249 };
250 self.active_subscriptions
251 .write()
252 .await
253 .insert(subscription_id, active);
254
255 tracing::debug!(?subscription_id, "Subscription created");
256
257 Ok((subscription_id, data))
258 }
259
260 pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
262 self.session_server
263 .remove_subscription(subscription_id)
264 .await;
265 self.subscription_manager
266 .remove_subscription(subscription_id)
267 .await;
268 self.active_subscriptions
269 .write()
270 .await
271 .remove(&subscription_id);
272 tracing::debug!(?subscription_id, "Subscription removed");
273 }
274
275 pub async fn subscribe_job(
277 &self,
278 session_id: SessionId,
279 client_sub_id: String,
280 job_id: Uuid, ) -> forge_core::Result<JobData> {
282 let subscription_id = SubscriptionId::new();
283
284 let job_data = self.fetch_job_data(job_id).await?;
286
287 let subscription = JobSubscription {
289 subscription_id,
290 session_id,
291 client_sub_id: client_sub_id.clone(),
292 job_id,
293 };
294
295 let mut subs = self.job_subscriptions.write().await;
296 subs.entry(job_id).or_default().push(subscription);
297
298 tracing::debug!(
299 ?subscription_id,
300 client_id = %client_sub_id,
301 %job_id,
302 "Job subscription created"
303 );
304
305 Ok(job_data)
306 }
307
308 pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
310 let mut subs = self.job_subscriptions.write().await;
311
312 for subscribers in subs.values_mut() {
314 subscribers
315 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
316 }
317
318 subs.retain(|_, v| !v.is_empty());
320
321 tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
322 }
323
324 pub async fn subscribe_workflow(
326 &self,
327 session_id: SessionId,
328 client_sub_id: String,
329 workflow_id: Uuid, ) -> forge_core::Result<WorkflowData> {
331 let subscription_id = SubscriptionId::new();
332
333 let workflow_data = self.fetch_workflow_data(workflow_id).await?;
335
336 let subscription = WorkflowSubscription {
338 subscription_id,
339 session_id,
340 client_sub_id: client_sub_id.clone(),
341 workflow_id,
342 };
343
344 let mut subs = self.workflow_subscriptions.write().await;
345 subs.entry(workflow_id).or_default().push(subscription);
346
347 tracing::debug!(
348 ?subscription_id,
349 client_id = %client_sub_id,
350 %workflow_id,
351 "Workflow subscription created"
352 );
353
354 Ok(workflow_data)
355 }
356
357 pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
359 let mut subs = self.workflow_subscriptions.write().await;
360
361 for subscribers in subs.values_mut() {
363 subscribers
364 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
365 }
366
367 subs.retain(|_, v| !v.is_empty());
369
370 tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
371 }
372
373 #[allow(clippy::type_complexity)]
375 async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
376 let row: Option<(
377 String,
378 Option<i32>,
379 Option<String>,
380 Option<serde_json::Value>,
381 Option<String>,
382 )> = sqlx::query_as(
383 r#"
384 SELECT status, progress_percent, progress_message, output, last_error
385 FROM forge_jobs WHERE id = $1
386 "#,
387 )
388 .bind(job_id)
389 .fetch_optional(&self.db_pool)
390 .await
391 .map_err(forge_core::ForgeError::Sql)?;
392
393 match row {
394 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
395 job_id: job_id.to_string(),
396 status,
397 progress_percent,
398 progress_message,
399 output,
400 error,
401 }),
402 None => Err(forge_core::ForgeError::NotFound(format!(
403 "Job {} not found",
404 job_id
405 ))),
406 }
407 }
408
409 #[allow(clippy::type_complexity)]
411 async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
412 let row: Option<(
414 String,
415 Option<String>,
416 Option<serde_json::Value>,
417 Option<String>,
418 )> = sqlx::query_as(
419 r#"
420 SELECT status, current_step, output, error
421 FROM forge_workflow_runs WHERE id = $1
422 "#,
423 )
424 .bind(workflow_id)
425 .fetch_optional(&self.db_pool)
426 .await
427 .map_err(forge_core::ForgeError::Sql)?;
428
429 let (status, current_step, output, error) = match row {
430 Some(r) => r,
431 None => {
432 return Err(forge_core::ForgeError::NotFound(format!(
433 "Workflow {} not found",
434 workflow_id
435 )));
436 }
437 };
438
439 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
441 r#"
442 SELECT step_name, status, error
443 FROM forge_workflow_steps
444 WHERE workflow_run_id = $1
445 ORDER BY started_at ASC NULLS LAST
446 "#,
447 )
448 .bind(workflow_id)
449 .fetch_all(&self.db_pool)
450 .await
451 .map_err(forge_core::ForgeError::Sql)?;
452
453 let steps = step_rows
454 .into_iter()
455 .map(|(name, status, error)| WorkflowStepData {
456 name,
457 status,
458 error,
459 })
460 .collect();
461
462 Ok(WorkflowData {
463 workflow_id: workflow_id.to_string(),
464 status,
465 current_step,
466 steps,
467 output,
468 error,
469 })
470 }
471
472 async fn execute_query(
474 &self,
475 query_name: &str,
476 args: &serde_json::Value,
477 auth_context: &forge_core::function::AuthContext,
478 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
479 match self.registry.get(query_name) {
480 Some(FunctionEntry::Query { info, handler }) => {
481 let ctx = forge_core::function::QueryContext::new(
482 self.db_pool.clone(),
483 auth_context.clone(),
484 forge_core::function::RequestMetadata::new(),
485 );
486
487 let normalized_args = match args {
489 v if v.is_object() && v.as_object().unwrap().is_empty() => {
490 serde_json::Value::Null
491 }
492 v => v.clone(),
493 };
494
495 let data = handler(&ctx, normalized_args).await?;
496
497 let mut read_set = ReadSet::new();
499
500 if info.table_dependencies.is_empty() {
501 let table_name = Self::extract_table_name(query_name);
504 read_set.add_table(&table_name);
505 tracing::debug!(
506 query = %query_name,
507 fallback_table = %table_name,
508 "No compile-time table dependencies found, using naming convention fallback"
509 );
510 } else {
511 for table in info.table_dependencies {
513 read_set.add_table(*table);
514 }
515 tracing::debug!(
516 query = %query_name,
517 tables = ?info.table_dependencies,
518 "Using compile-time table dependencies"
519 );
520 }
521
522 Ok((data, read_set))
523 }
524 Some(_) => Err(forge_core::ForgeError::Validation(format!(
525 "'{}' is not a query",
526 query_name
527 ))),
528 None => Err(forge_core::ForgeError::Validation(format!(
529 "Query '{}' not found",
530 query_name
531 ))),
532 }
533 }
534
535 fn compute_hash(data: &serde_json::Value) -> String {
537 use std::collections::hash_map::DefaultHasher;
538 use std::hash::{Hash, Hasher};
539
540 let json = serde_json::to_string(data).unwrap_or_default();
541 let mut hasher = DefaultHasher::new();
542 json.hash(&mut hasher);
543 format!("{:x}", hasher.finish())
544 }
545
546 pub async fn start(&self) -> forge_core::Result<()> {
548 let listener = self.change_listener.clone();
549 let invalidation_engine = self.invalidation_engine.clone();
550 let active_subscriptions = self.active_subscriptions.clone();
551 let job_subscriptions = self.job_subscriptions.clone();
552 let workflow_subscriptions = self.workflow_subscriptions.clone();
553 let session_server = self.session_server.clone();
554 let registry = self.registry.clone();
555 let db_pool = self.db_pool.clone();
556 let mut shutdown_rx = self.shutdown_tx.subscribe();
557 let max_restarts = self.max_listener_restarts;
558 let base_delay_ms = self.listener_restart_delay_ms;
559
560 let mut change_rx = listener.subscribe();
562
563 tokio::spawn(async move {
565 tracing::info!("Reactor started, listening for changes");
566
567 let mut restart_count: u32 = 0;
568 let (listener_error_tx, mut listener_error_rx) = mpsc::channel::<String>(1);
569
570 let listener_clone = listener.clone();
572 let error_tx = listener_error_tx.clone();
573 let mut listener_handle = Some(tokio::spawn(async move {
574 if let Err(e) = listener_clone.run().await {
575 let _ = error_tx.send(format!("Change listener error: {}", e)).await;
576 }
577 }));
578
579 loop {
580 tokio::select! {
581 result = change_rx.recv() => {
582 match result {
583 Ok(change) => {
584 Self::handle_change(
585 &change,
586 &invalidation_engine,
587 &active_subscriptions,
588 &job_subscriptions,
589 &workflow_subscriptions,
590 &session_server,
591 ®istry,
592 &db_pool,
593 ).await;
594 }
595 Err(broadcast::error::RecvError::Lagged(n)) => {
596 tracing::warn!("Reactor lagged by {} messages", n);
597 }
598 Err(broadcast::error::RecvError::Closed) => {
599 tracing::info!("Change channel closed");
600 break;
601 }
602 }
603 }
604 Some(error_msg) = listener_error_rx.recv() => {
605 tracing::error!("Listener failed: {}", error_msg);
606
607 if restart_count >= max_restarts {
608 tracing::error!(
609 "Listener failed {} times, giving up. Real-time updates disabled.",
610 restart_count
611 );
612 break;
613 }
614
615 restart_count += 1;
616 let delay = base_delay_ms * 2u64.saturating_pow(restart_count - 1);
617 tracing::warn!(
618 "Restarting listener in {}ms (attempt {}/{})",
619 delay, restart_count, max_restarts
620 );
621
622 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
623
624 let listener_clone = listener.clone();
626 let error_tx = listener_error_tx.clone();
627 if let Some(handle) = listener_handle.take() {
628 handle.abort();
629 }
630 change_rx = listener.subscribe();
631 listener_handle = Some(tokio::spawn(async move {
632 if let Err(e) = listener_clone.run().await {
633 let _ = error_tx.send(format!("Change listener error: {}", e)).await;
634 }
635 }));
636 }
637 _ = shutdown_rx.recv() => {
638 tracing::info!("Reactor shutting down");
639 break;
640 }
641 }
642 }
643
644 if let Some(handle) = listener_handle {
645 handle.abort();
646 }
647 });
648
649 Ok(())
650 }
651
652 #[allow(clippy::too_many_arguments)]
654 async fn handle_change(
655 change: &Change,
656 invalidation_engine: &Arc<InvalidationEngine>,
657 active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
658 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
659 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
660 session_server: &Arc<SessionServer>,
661 registry: &FunctionRegistry,
662 db_pool: &sqlx::PgPool,
663 ) {
664 tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
665
666 match change.table.as_str() {
668 "forge_jobs" => {
669 if let Some(job_id) = change.row_id {
670 Self::handle_job_change(job_id, job_subscriptions, session_server, db_pool)
671 .await;
672 }
673 return; }
675 "forge_workflow_runs" => {
676 if let Some(workflow_id) = change.row_id {
677 Self::handle_workflow_change(
678 workflow_id,
679 workflow_subscriptions,
680 session_server,
681 db_pool,
682 )
683 .await;
684 }
685 return; }
687 "forge_workflow_steps" => {
688 if let Some(step_id) = change.row_id {
690 Self::handle_workflow_step_change(
691 step_id,
692 workflow_subscriptions,
693 session_server,
694 db_pool,
695 )
696 .await;
697 }
698 return; }
700 _ => {}
701 }
702
703 invalidation_engine.process_change(change.clone()).await;
705
706 let invalidated = invalidation_engine.flush_all().await;
710
711 if invalidated.is_empty() {
712 return;
713 }
714
715 tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
716
717 let subs_to_process: Vec<_> = {
719 let subscriptions = active_subscriptions.read().await;
720 invalidated
721 .iter()
722 .filter_map(|sub_id| {
723 subscriptions.get(sub_id).map(|active| {
724 (
725 *sub_id,
726 active.session_id,
727 active.client_sub_id.clone(),
728 active.query_name.clone(),
729 active.args.clone(),
730 active.last_result_hash.clone(),
731 active.auth_context.clone(),
732 )
733 })
734 })
735 .collect()
736 };
737
738 let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
740
741 for (sub_id, session_id, client_sub_id, query_name, args, last_hash, auth_context) in
743 subs_to_process
744 {
745 match Self::execute_query_static(registry, db_pool, &query_name, &args, &auth_context)
747 .await
748 {
749 Ok((new_data, _read_set)) => {
750 let new_hash = Self::compute_hash(&new_data);
751
752 if last_hash.as_ref() != Some(&new_hash) {
754 let message = RealtimeMessage::Data {
756 subscription_id: client_sub_id.clone(),
757 data: new_data,
758 };
759
760 if let Err(e) = session_server.send_to_session(session_id, message).await {
761 tracing::warn!(client_id = %client_sub_id, "Failed to send update: {}", e);
762 } else {
763 tracing::debug!(client_id = %client_sub_id, "Pushed update to client");
764 updates.push((sub_id, new_hash));
766 }
767 }
768 }
769 Err(e) => {
770 tracing::error!(client_id = %client_sub_id, "Failed to re-execute query: {}", e);
771 }
772 }
773 }
774
775 if !updates.is_empty() {
777 let mut subscriptions = active_subscriptions.write().await;
778 for (sub_id, new_hash) in updates {
779 if let Some(active) = subscriptions.get_mut(&sub_id) {
780 active.last_result_hash = Some(new_hash);
781 }
782 }
783 }
784 }
785
786 async fn handle_job_change(
788 job_id: Uuid,
789 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
790 session_server: &Arc<SessionServer>,
791 db_pool: &sqlx::PgPool,
792 ) {
793 let subs = job_subscriptions.read().await;
794 let subscribers = match subs.get(&job_id) {
795 Some(s) if !s.is_empty() => s.clone(),
796 _ => return, };
798 drop(subs); let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
802 Ok(data) => data,
803 Err(e) => {
804 tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
805 return;
806 }
807 };
808
809 for sub in subscribers {
811 let message = RealtimeMessage::JobUpdate {
812 client_sub_id: sub.client_sub_id.clone(),
813 job: job_data.clone(),
814 };
815
816 if let Err(e) = session_server
817 .send_to_session(sub.session_id, message)
818 .await
819 {
820 tracing::debug!(
822 %job_id,
823 client_id = %sub.client_sub_id,
824 "Failed to send job update (session likely disconnected): {}",
825 e
826 );
827 } else {
828 tracing::debug!(
829 %job_id,
830 client_id = %sub.client_sub_id,
831 "Pushed job update to client"
832 );
833 }
834 }
835 }
836
837 async fn handle_workflow_change(
839 workflow_id: Uuid,
840 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
841 session_server: &Arc<SessionServer>,
842 db_pool: &sqlx::PgPool,
843 ) {
844 let subs = workflow_subscriptions.read().await;
845 let subscribers = match subs.get(&workflow_id) {
846 Some(s) if !s.is_empty() => s.clone(),
847 _ => return, };
849 drop(subs); let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
853 Ok(data) => data,
854 Err(e) => {
855 tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
856 return;
857 }
858 };
859
860 for sub in subscribers {
862 let message = RealtimeMessage::WorkflowUpdate {
863 client_sub_id: sub.client_sub_id.clone(),
864 workflow: workflow_data.clone(),
865 };
866
867 if let Err(e) = session_server
868 .send_to_session(sub.session_id, message)
869 .await
870 {
871 tracing::debug!(
873 %workflow_id,
874 client_id = %sub.client_sub_id,
875 "Failed to send workflow update (session likely disconnected): {}",
876 e
877 );
878 } else {
879 tracing::debug!(
880 %workflow_id,
881 client_id = %sub.client_sub_id,
882 "Pushed workflow update to client"
883 );
884 }
885 }
886 }
887
888 async fn handle_workflow_step_change(
890 step_id: Uuid,
891 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
892 session_server: &Arc<SessionServer>,
893 db_pool: &sqlx::PgPool,
894 ) {
895 let workflow_id: Option<Uuid> = match sqlx::query_scalar(
897 "SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1",
898 )
899 .bind(step_id)
900 .fetch_optional(db_pool)
901 .await
902 {
903 Ok(id) => id,
904 Err(e) => {
905 tracing::warn!(%step_id, "Failed to look up workflow_run_id for step: {}", e);
906 return;
907 }
908 };
909
910 if let Some(wf_id) = workflow_id {
911 Self::handle_workflow_change(wf_id, workflow_subscriptions, session_server, db_pool)
913 .await;
914 }
915 }
916
917 #[allow(clippy::type_complexity)]
919 async fn fetch_job_data_static(
920 job_id: Uuid,
921 db_pool: &sqlx::PgPool,
922 ) -> forge_core::Result<JobData> {
923 let row: Option<(
924 String,
925 Option<i32>,
926 Option<String>,
927 Option<serde_json::Value>,
928 Option<String>,
929 )> = sqlx::query_as(
930 r#"
931 SELECT status, progress_percent, progress_message, output, last_error
932 FROM forge_jobs WHERE id = $1
933 "#,
934 )
935 .bind(job_id)
936 .fetch_optional(db_pool)
937 .await
938 .map_err(forge_core::ForgeError::Sql)?;
939
940 match row {
941 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
942 job_id: job_id.to_string(),
943 status,
944 progress_percent,
945 progress_message,
946 output,
947 error,
948 }),
949 None => Err(forge_core::ForgeError::NotFound(format!(
950 "Job {} not found",
951 job_id
952 ))),
953 }
954 }
955
956 #[allow(clippy::type_complexity)]
958 async fn fetch_workflow_data_static(
959 workflow_id: Uuid,
960 db_pool: &sqlx::PgPool,
961 ) -> forge_core::Result<WorkflowData> {
962 let row: Option<(
963 String,
964 Option<String>,
965 Option<serde_json::Value>,
966 Option<String>,
967 )> = sqlx::query_as(
968 r#"
969 SELECT status, current_step, output, error
970 FROM forge_workflow_runs WHERE id = $1
971 "#,
972 )
973 .bind(workflow_id)
974 .fetch_optional(db_pool)
975 .await
976 .map_err(forge_core::ForgeError::Sql)?;
977
978 let (status, current_step, output, error) = match row {
979 Some(r) => r,
980 None => {
981 return Err(forge_core::ForgeError::NotFound(format!(
982 "Workflow {} not found",
983 workflow_id
984 )));
985 }
986 };
987
988 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
989 r#"
990 SELECT step_name, status, error
991 FROM forge_workflow_steps
992 WHERE workflow_run_id = $1
993 ORDER BY started_at ASC NULLS LAST
994 "#,
995 )
996 .bind(workflow_id)
997 .fetch_all(db_pool)
998 .await
999 .map_err(forge_core::ForgeError::Sql)?;
1000
1001 let steps = step_rows
1002 .into_iter()
1003 .map(|(name, status, error)| WorkflowStepData {
1004 name,
1005 status,
1006 error,
1007 })
1008 .collect();
1009
1010 Ok(WorkflowData {
1011 workflow_id: workflow_id.to_string(),
1012 status,
1013 current_step,
1014 steps,
1015 output,
1016 error,
1017 })
1018 }
1019
1020 async fn execute_query_static(
1022 registry: &FunctionRegistry,
1023 db_pool: &sqlx::PgPool,
1024 query_name: &str,
1025 args: &serde_json::Value,
1026 auth_context: &forge_core::function::AuthContext,
1027 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
1028 match registry.get(query_name) {
1029 Some(FunctionEntry::Query { info, handler }) => {
1030 let ctx = forge_core::function::QueryContext::new(
1031 db_pool.clone(),
1032 auth_context.clone(),
1033 forge_core::function::RequestMetadata::new(),
1034 );
1035
1036 let normalized_args = match args {
1037 v if v.is_object() && v.as_object().unwrap().is_empty() => {
1038 serde_json::Value::Null
1039 }
1040 v => v.clone(),
1041 };
1042
1043 let data = handler(&ctx, normalized_args).await?;
1044
1045 let mut read_set = ReadSet::new();
1047
1048 if info.table_dependencies.is_empty() {
1049 let table_name = Self::extract_table_name(query_name);
1051 read_set.add_table(&table_name);
1052 tracing::debug!(
1053 query = %query_name,
1054 fallback_table = %table_name,
1055 "No compile-time table dependencies found (static), using naming convention fallback"
1056 );
1057 } else {
1058 for table in info.table_dependencies {
1059 read_set.add_table(*table);
1060 }
1061 tracing::debug!(
1062 query = %query_name,
1063 tables = ?info.table_dependencies,
1064 "Using compile-time table dependencies (static)"
1065 );
1066 }
1067
1068 Ok((data, read_set))
1069 }
1070 _ => Err(forge_core::ForgeError::Validation(format!(
1071 "Query '{}' not found or not a query",
1072 query_name
1073 ))),
1074 }
1075 }
1076
1077 fn extract_table_name(query_name: &str) -> String {
1079 if let Some(rest) = query_name.strip_prefix("get_") {
1080 rest.to_string()
1081 } else if let Some(rest) = query_name.strip_prefix("list_") {
1082 rest.to_string()
1083 } else if let Some(rest) = query_name.strip_prefix("find_") {
1084 rest.to_string()
1085 } else if let Some(rest) = query_name.strip_prefix("fetch_") {
1086 rest.to_string()
1087 } else {
1088 query_name.to_string()
1089 }
1090 }
1091
1092 pub fn stop(&self) {
1094 let _ = self.shutdown_tx.send(());
1095 self.change_listener.stop();
1096 }
1097
1098 pub async fn stats(&self) -> ReactorStats {
1100 let session_stats = self.session_server.stats().await;
1101 let inv_stats = self.invalidation_engine.stats().await;
1102
1103 ReactorStats {
1104 connections: session_stats.connections,
1105 subscriptions: session_stats.subscriptions,
1106 pending_invalidations: inv_stats.pending_subscriptions,
1107 listener_running: self.change_listener.is_running(),
1108 }
1109 }
1110}
1111
1112#[derive(Debug, Clone)]
1114pub struct ReactorStats {
1115 pub connections: usize,
1116 pub subscriptions: usize,
1117 pub pending_invalidations: usize,
1118 pub listener_running: bool,
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123 use super::*;
1124
1125 #[test]
1126 fn test_reactor_config_default() {
1127 let config = ReactorConfig::default();
1128 assert_eq!(config.listener.channel, "forge_changes");
1129 assert_eq!(config.invalidation.debounce_ms, 50);
1130 assert_eq!(config.max_listener_restarts, 5);
1131 assert_eq!(config.listener_restart_delay_ms, 1000);
1132 }
1133
1134 #[test]
1135 fn test_compute_hash() {
1136 let data1 = serde_json::json!({"name": "test"});
1137 let data2 = serde_json::json!({"name": "test"});
1138 let data3 = serde_json::json!({"name": "different"});
1139
1140 let hash1 = Reactor::compute_hash(&data1);
1141 let hash2 = Reactor::compute_hash(&data2);
1142 let hash3 = Reactor::compute_hash(&data3);
1143
1144 assert_eq!(hash1, hash2);
1145 assert_ne!(hash1, hash3);
1146 }
1147}