1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::message::{
14 JobData, RealtimeConfig, RealtimeMessage, SessionServer, WorkflowData, WorkflowStepData,
15};
16use crate::function::{FunctionEntry, FunctionRegistry};
17
18#[derive(Debug, Clone)]
19pub struct ReactorConfig {
20 pub listener: ListenerConfig,
21 pub invalidation: InvalidationConfig,
22 pub realtime: RealtimeConfig,
23 pub max_listener_restarts: u32,
24 pub listener_restart_delay_ms: u64,
26}
27
28impl Default for ReactorConfig {
29 fn default() -> Self {
30 Self {
31 listener: ListenerConfig::default(),
32 invalidation: InvalidationConfig::default(),
33 realtime: RealtimeConfig::default(),
34 max_listener_restarts: 5,
35 listener_restart_delay_ms: 1000,
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ActiveSubscription {
43 #[allow(dead_code)]
44 pub subscription_id: SubscriptionId,
45 pub session_id: SessionId,
46 #[allow(dead_code)]
47 pub client_sub_id: String,
48 pub query_name: String,
49 pub args: serde_json::Value,
50 pub last_result_hash: Option<String>,
51 #[allow(dead_code)]
52 pub read_set: ReadSet,
53 pub auth_context: forge_core::function::AuthContext,
55}
56
57#[derive(Debug, Clone)]
59pub struct JobSubscription {
60 #[allow(dead_code)]
61 pub subscription_id: SubscriptionId,
62 pub session_id: SessionId,
63 pub client_sub_id: String,
64 #[allow(dead_code)]
65 pub job_id: Uuid, }
67
68#[derive(Debug, Clone)]
70pub struct WorkflowSubscription {
71 #[allow(dead_code)]
72 pub subscription_id: SubscriptionId,
73 pub session_id: SessionId,
74 pub client_sub_id: String,
75 #[allow(dead_code)]
76 pub workflow_id: Uuid, }
78
79pub struct Reactor {
81 node_id: NodeId,
82 db_pool: sqlx::PgPool,
83 registry: FunctionRegistry,
84 subscription_manager: Arc<SubscriptionManager>,
85 session_server: Arc<SessionServer>,
86 change_listener: Arc<ChangeListener>,
87 invalidation_engine: Arc<InvalidationEngine>,
88 active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
90 job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
92 workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
94 shutdown_tx: broadcast::Sender<()>,
96 max_listener_restarts: u32,
98 listener_restart_delay_ms: u64,
99}
100
101impl Reactor {
102 pub fn new(
104 node_id: NodeId,
105 db_pool: sqlx::PgPool,
106 registry: FunctionRegistry,
107 config: ReactorConfig,
108 ) -> Self {
109 let subscription_manager = Arc::new(SubscriptionManager::new(
110 config.realtime.max_subscriptions_per_session,
111 ));
112 let session_server = Arc::new(SessionServer::new(node_id, config.realtime.clone()));
113 let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
114 let invalidation_engine = Arc::new(InvalidationEngine::new(
115 subscription_manager.clone(),
116 config.invalidation,
117 ));
118 let (shutdown_tx, _) = broadcast::channel(1);
119
120 Self {
121 node_id,
122 db_pool,
123 registry,
124 subscription_manager,
125 session_server,
126 change_listener,
127 invalidation_engine,
128 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
129 job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
130 workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
131 shutdown_tx,
132 max_listener_restarts: config.max_listener_restarts,
133 listener_restart_delay_ms: config.listener_restart_delay_ms,
134 }
135 }
136
137 pub fn node_id(&self) -> NodeId {
139 self.node_id
140 }
141
142 pub fn session_server(&self) -> Arc<SessionServer> {
144 self.session_server.clone()
145 }
146
147 pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
149 self.subscription_manager.clone()
150 }
151
152 pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
154 self.shutdown_tx.subscribe()
155 }
156
157 pub async fn register_session(
159 &self,
160 session_id: SessionId,
161 sender: mpsc::Sender<RealtimeMessage>,
162 ) {
163 self.session_server
164 .register_connection(session_id, sender)
165 .await;
166 tracing::debug!(?session_id, "Session registered with reactor");
167 }
168
169 pub async fn remove_session(&self, session_id: SessionId) {
171 if let Some(subscription_ids) = self.session_server.remove_connection(session_id).await {
172 for sub_id in subscription_ids {
174 self.subscription_manager.remove_subscription(sub_id).await;
175 self.active_subscriptions.write().await.remove(&sub_id);
176 }
177 }
178
179 {
181 let mut job_subs = self.job_subscriptions.write().await;
182 for subscribers in job_subs.values_mut() {
183 subscribers.retain(|s| s.session_id != session_id);
184 }
185 job_subs.retain(|_, v| !v.is_empty());
187 }
188
189 {
191 let mut workflow_subs = self.workflow_subscriptions.write().await;
192 for subscribers in workflow_subs.values_mut() {
193 subscribers.retain(|s| s.session_id != session_id);
194 }
195 workflow_subs.retain(|_, v| !v.is_empty());
197 }
198
199 tracing::debug!(?session_id, "Session removed from reactor");
200 }
201
202 pub async fn subscribe(
204 &self,
205 session_id: SessionId,
206 client_sub_id: String,
207 query_name: String,
208 args: serde_json::Value,
209 auth_context: forge_core::function::AuthContext,
210 ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
211 let sub_info = self
212 .subscription_manager
213 .create_subscription(session_id, &query_name, args.clone())
214 .await?;
215
216 let subscription_id = sub_info.id;
217
218 self.session_server
219 .add_subscription(session_id, subscription_id)
220 .await?;
221
222 let (data, read_set) = self
223 .execute_query(&query_name, &args, &auth_context)
224 .await?;
225
226 let result_hash = Self::compute_hash(&data);
227
228 let tables: Vec<_> = read_set.tables.iter().collect();
229 tracing::debug!(
230 ?subscription_id,
231 query_name = %query_name,
232 read_set_tables = ?tables,
233 "Updating subscription with read set"
234 );
235
236 self.subscription_manager
237 .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
238 .await;
239
240 let active = ActiveSubscription {
241 subscription_id,
242 session_id,
243 client_sub_id,
244 query_name,
245 args,
246 last_result_hash: Some(result_hash),
247 read_set,
248 auth_context,
249 };
250 self.active_subscriptions
251 .write()
252 .await
253 .insert(subscription_id, active);
254
255 tracing::debug!(?subscription_id, "Subscription created");
256
257 Ok((subscription_id, data))
258 }
259
260 pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
262 self.session_server
263 .remove_subscription(subscription_id)
264 .await;
265 self.subscription_manager
266 .remove_subscription(subscription_id)
267 .await;
268 self.active_subscriptions
269 .write()
270 .await
271 .remove(&subscription_id);
272 tracing::debug!(?subscription_id, "Subscription removed");
273 }
274
275 pub async fn subscribe_job(
277 &self,
278 session_id: SessionId,
279 client_sub_id: String,
280 job_id: Uuid, ) -> forge_core::Result<JobData> {
282 let subscription_id = SubscriptionId::new();
283
284 let job_data = self.fetch_job_data(job_id).await?;
286
287 let subscription = JobSubscription {
289 subscription_id,
290 session_id,
291 client_sub_id: client_sub_id.clone(),
292 job_id,
293 };
294
295 let mut subs = self.job_subscriptions.write().await;
296 subs.entry(job_id).or_default().push(subscription);
297
298 tracing::debug!(
299 ?subscription_id,
300 client_id = %client_sub_id,
301 %job_id,
302 "Job subscription created"
303 );
304
305 Ok(job_data)
306 }
307
308 pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
310 let mut subs = self.job_subscriptions.write().await;
311
312 for subscribers in subs.values_mut() {
314 subscribers
315 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
316 }
317
318 subs.retain(|_, v| !v.is_empty());
320
321 tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
322 }
323
324 pub async fn subscribe_workflow(
326 &self,
327 session_id: SessionId,
328 client_sub_id: String,
329 workflow_id: Uuid, ) -> forge_core::Result<WorkflowData> {
331 let subscription_id = SubscriptionId::new();
332
333 let workflow_data = self.fetch_workflow_data(workflow_id).await?;
335
336 let subscription = WorkflowSubscription {
338 subscription_id,
339 session_id,
340 client_sub_id: client_sub_id.clone(),
341 workflow_id,
342 };
343
344 let mut subs = self.workflow_subscriptions.write().await;
345 subs.entry(workflow_id).or_default().push(subscription);
346
347 tracing::debug!(
348 ?subscription_id,
349 client_id = %client_sub_id,
350 %workflow_id,
351 "Workflow subscription created"
352 );
353
354 Ok(workflow_data)
355 }
356
357 pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
359 let mut subs = self.workflow_subscriptions.write().await;
360
361 for subscribers in subs.values_mut() {
363 subscribers
364 .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
365 }
366
367 subs.retain(|_, v| !v.is_empty());
369
370 tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
371 }
372
373 #[allow(clippy::type_complexity)]
375 async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
376 let row: Option<(
377 String,
378 Option<i32>,
379 Option<String>,
380 Option<serde_json::Value>,
381 Option<String>,
382 )> = sqlx::query_as(
383 r#"
384 SELECT status, progress_percent, progress_message, output,
385 COALESCE(cancel_reason, last_error) as error
386 FROM forge_jobs WHERE id = $1
387 "#,
388 )
389 .bind(job_id)
390 .fetch_optional(&self.db_pool)
391 .await
392 .map_err(forge_core::ForgeError::Sql)?;
393
394 match row {
395 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
396 job_id: job_id.to_string(),
397 status,
398 progress_percent,
399 progress_message,
400 output,
401 error,
402 }),
403 None => Err(forge_core::ForgeError::NotFound(format!(
404 "Job {} not found",
405 job_id
406 ))),
407 }
408 }
409
410 #[allow(clippy::type_complexity)]
412 async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
413 let row: Option<(
415 String,
416 Option<String>,
417 Option<serde_json::Value>,
418 Option<String>,
419 )> = sqlx::query_as(
420 r#"
421 SELECT status, current_step, output, error
422 FROM forge_workflow_runs WHERE id = $1
423 "#,
424 )
425 .bind(workflow_id)
426 .fetch_optional(&self.db_pool)
427 .await
428 .map_err(forge_core::ForgeError::Sql)?;
429
430 let (status, current_step, output, error) = match row {
431 Some(r) => r,
432 None => {
433 return Err(forge_core::ForgeError::NotFound(format!(
434 "Workflow {} not found",
435 workflow_id
436 )));
437 }
438 };
439
440 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
442 r#"
443 SELECT step_name, status, error
444 FROM forge_workflow_steps
445 WHERE workflow_run_id = $1
446 ORDER BY started_at ASC NULLS LAST
447 "#,
448 )
449 .bind(workflow_id)
450 .fetch_all(&self.db_pool)
451 .await
452 .map_err(forge_core::ForgeError::Sql)?;
453
454 let steps = step_rows
455 .into_iter()
456 .map(|(name, status, error)| WorkflowStepData {
457 name,
458 status,
459 error,
460 })
461 .collect();
462
463 Ok(WorkflowData {
464 workflow_id: workflow_id.to_string(),
465 status,
466 current_step,
467 steps,
468 output,
469 error,
470 })
471 }
472
473 async fn execute_query(
475 &self,
476 query_name: &str,
477 args: &serde_json::Value,
478 auth_context: &forge_core::function::AuthContext,
479 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
480 match self.registry.get(query_name) {
481 Some(FunctionEntry::Query { info, handler }) => {
482 let ctx = forge_core::function::QueryContext::new(
483 self.db_pool.clone(),
484 auth_context.clone(),
485 forge_core::function::RequestMetadata::new(),
486 );
487
488 let normalized_args = match args {
490 v if v.is_object() && v.as_object().unwrap().is_empty() => {
491 serde_json::Value::Null
492 }
493 v => v.clone(),
494 };
495
496 let data = handler(&ctx, normalized_args).await?;
497
498 let mut read_set = ReadSet::new();
500
501 if info.table_dependencies.is_empty() {
502 let table_name = Self::extract_table_name(query_name);
505 read_set.add_table(&table_name);
506 tracing::debug!(
507 query = %query_name,
508 fallback_table = %table_name,
509 "No compile-time table dependencies found, using naming convention fallback"
510 );
511 } else {
512 for table in info.table_dependencies {
514 read_set.add_table(*table);
515 }
516 tracing::debug!(
517 query = %query_name,
518 tables = ?info.table_dependencies,
519 "Using compile-time table dependencies"
520 );
521 }
522
523 Ok((data, read_set))
524 }
525 Some(_) => Err(forge_core::ForgeError::Validation(format!(
526 "'{}' is not a query",
527 query_name
528 ))),
529 None => Err(forge_core::ForgeError::Validation(format!(
530 "Query '{}' not found",
531 query_name
532 ))),
533 }
534 }
535
536 fn compute_hash(data: &serde_json::Value) -> String {
538 use std::collections::hash_map::DefaultHasher;
539 use std::hash::{Hash, Hasher};
540
541 let json = serde_json::to_string(data).unwrap_or_default();
542 let mut hasher = DefaultHasher::new();
543 json.hash(&mut hasher);
544 format!("{:x}", hasher.finish())
545 }
546
547 pub async fn start(&self) -> forge_core::Result<()> {
549 let listener = self.change_listener.clone();
550 let invalidation_engine = self.invalidation_engine.clone();
551 let active_subscriptions = self.active_subscriptions.clone();
552 let job_subscriptions = self.job_subscriptions.clone();
553 let workflow_subscriptions = self.workflow_subscriptions.clone();
554 let session_server = self.session_server.clone();
555 let registry = self.registry.clone();
556 let db_pool = self.db_pool.clone();
557 let mut shutdown_rx = self.shutdown_tx.subscribe();
558 let max_restarts = self.max_listener_restarts;
559 let base_delay_ms = self.listener_restart_delay_ms;
560
561 let mut change_rx = listener.subscribe();
563
564 tokio::spawn(async move {
566 tracing::info!("Reactor started, listening for changes");
567
568 let mut restart_count: u32 = 0;
569 let (listener_error_tx, mut listener_error_rx) = mpsc::channel::<String>(1);
570
571 let listener_clone = listener.clone();
573 let error_tx = listener_error_tx.clone();
574 let mut listener_handle = Some(tokio::spawn(async move {
575 if let Err(e) = listener_clone.run().await {
576 let _ = error_tx.send(format!("Change listener error: {}", e)).await;
577 }
578 }));
579
580 loop {
581 tokio::select! {
582 result = change_rx.recv() => {
583 match result {
584 Ok(change) => {
585 Self::handle_change(
586 &change,
587 &invalidation_engine,
588 &active_subscriptions,
589 &job_subscriptions,
590 &workflow_subscriptions,
591 &session_server,
592 ®istry,
593 &db_pool,
594 ).await;
595 }
596 Err(broadcast::error::RecvError::Lagged(n)) => {
597 tracing::warn!("Reactor lagged by {} messages", n);
598 }
599 Err(broadcast::error::RecvError::Closed) => {
600 tracing::info!("Change channel closed");
601 break;
602 }
603 }
604 }
605 Some(error_msg) = listener_error_rx.recv() => {
606 tracing::error!("Listener failed: {}", error_msg);
607
608 if restart_count >= max_restarts {
609 tracing::error!(
610 "Listener failed {} times, giving up. Real-time updates disabled.",
611 restart_count
612 );
613 break;
614 }
615
616 restart_count += 1;
617 let delay = base_delay_ms * 2u64.saturating_pow(restart_count - 1);
618 tracing::warn!(
619 "Restarting listener in {}ms (attempt {}/{})",
620 delay, restart_count, max_restarts
621 );
622
623 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
624
625 let listener_clone = listener.clone();
627 let error_tx = listener_error_tx.clone();
628 if let Some(handle) = listener_handle.take() {
629 handle.abort();
630 }
631 change_rx = listener.subscribe();
632 listener_handle = Some(tokio::spawn(async move {
633 if let Err(e) = listener_clone.run().await {
634 let _ = error_tx.send(format!("Change listener error: {}", e)).await;
635 }
636 }));
637 }
638 _ = shutdown_rx.recv() => {
639 tracing::info!("Reactor shutting down");
640 break;
641 }
642 }
643 }
644
645 if let Some(handle) = listener_handle {
646 handle.abort();
647 }
648 });
649
650 Ok(())
651 }
652
653 #[allow(clippy::too_many_arguments)]
655 async fn handle_change(
656 change: &Change,
657 invalidation_engine: &Arc<InvalidationEngine>,
658 active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
659 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
660 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
661 session_server: &Arc<SessionServer>,
662 registry: &FunctionRegistry,
663 db_pool: &sqlx::PgPool,
664 ) {
665 tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
666
667 match change.table.as_str() {
669 "forge_jobs" => {
670 if let Some(job_id) = change.row_id {
671 Self::handle_job_change(job_id, job_subscriptions, session_server, db_pool)
672 .await;
673 }
674 return; }
676 "forge_workflow_runs" => {
677 if let Some(workflow_id) = change.row_id {
678 Self::handle_workflow_change(
679 workflow_id,
680 workflow_subscriptions,
681 session_server,
682 db_pool,
683 )
684 .await;
685 }
686 return; }
688 "forge_workflow_steps" => {
689 if let Some(step_id) = change.row_id {
691 Self::handle_workflow_step_change(
692 step_id,
693 workflow_subscriptions,
694 session_server,
695 db_pool,
696 )
697 .await;
698 }
699 return; }
701 _ => {}
702 }
703
704 invalidation_engine.process_change(change.clone()).await;
706
707 let invalidated = invalidation_engine.check_pending().await;
712
713 if invalidated.is_empty() {
714 return;
715 }
716
717 tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
718
719 let subs_to_process: Vec<_> = {
721 let subscriptions = active_subscriptions.read().await;
722 invalidated
723 .iter()
724 .filter_map(|sub_id| {
725 subscriptions.get(sub_id).map(|active| {
726 (
727 *sub_id,
728 active.session_id,
729 active.client_sub_id.clone(),
730 active.query_name.clone(),
731 active.args.clone(),
732 active.last_result_hash.clone(),
733 active.auth_context.clone(),
734 )
735 })
736 })
737 .collect()
738 };
739
740 let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
742
743 for (sub_id, session_id, client_sub_id, query_name, args, last_hash, auth_context) in
745 subs_to_process
746 {
747 match Self::execute_query_static(registry, db_pool, &query_name, &args, &auth_context)
749 .await
750 {
751 Ok((new_data, _read_set)) => {
752 let new_hash = Self::compute_hash(&new_data);
753
754 if last_hash.as_ref() != Some(&new_hash) {
756 let message = RealtimeMessage::Data {
758 subscription_id: client_sub_id.clone(),
759 data: new_data,
760 };
761
762 if let Err(e) = session_server.send_to_session(session_id, message).await {
763 tracing::warn!(client_id = %client_sub_id, "Failed to send update: {}", e);
764 } else {
765 tracing::debug!(client_id = %client_sub_id, "Pushed update to client");
766 updates.push((sub_id, new_hash));
768 }
769 }
770 }
771 Err(e) => {
772 tracing::error!(client_id = %client_sub_id, "Failed to re-execute query: {}", e);
773 }
774 }
775 }
776
777 if !updates.is_empty() {
779 let mut subscriptions = active_subscriptions.write().await;
780 for (sub_id, new_hash) in updates {
781 if let Some(active) = subscriptions.get_mut(&sub_id) {
782 active.last_result_hash = Some(new_hash);
783 }
784 }
785 }
786 }
787
788 async fn handle_job_change(
790 job_id: Uuid,
791 job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
792 session_server: &Arc<SessionServer>,
793 db_pool: &sqlx::PgPool,
794 ) {
795 let subs = job_subscriptions.read().await;
796 let subscribers = match subs.get(&job_id) {
797 Some(s) if !s.is_empty() => s.clone(),
798 _ => return, };
800 drop(subs); let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
804 Ok(data) => data,
805 Err(e) => {
806 tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
807 return;
808 }
809 };
810
811 for sub in subscribers {
813 let message = RealtimeMessage::JobUpdate {
814 client_sub_id: sub.client_sub_id.clone(),
815 job: job_data.clone(),
816 };
817
818 if let Err(e) = session_server
819 .send_to_session(sub.session_id, message)
820 .await
821 {
822 tracing::debug!(
824 %job_id,
825 client_id = %sub.client_sub_id,
826 "Failed to send job update (session likely disconnected): {}",
827 e
828 );
829 } else {
830 tracing::debug!(
831 %job_id,
832 client_id = %sub.client_sub_id,
833 "Pushed job update to client"
834 );
835 }
836 }
837 }
838
839 async fn handle_workflow_change(
841 workflow_id: Uuid,
842 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
843 session_server: &Arc<SessionServer>,
844 db_pool: &sqlx::PgPool,
845 ) {
846 let subs = workflow_subscriptions.read().await;
847 let subscribers = match subs.get(&workflow_id) {
848 Some(s) if !s.is_empty() => s.clone(),
849 _ => return, };
851 drop(subs); let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
855 Ok(data) => data,
856 Err(e) => {
857 tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
858 return;
859 }
860 };
861
862 for sub in subscribers {
864 let message = RealtimeMessage::WorkflowUpdate {
865 client_sub_id: sub.client_sub_id.clone(),
866 workflow: workflow_data.clone(),
867 };
868
869 if let Err(e) = session_server
870 .send_to_session(sub.session_id, message)
871 .await
872 {
873 tracing::debug!(
875 %workflow_id,
876 client_id = %sub.client_sub_id,
877 "Failed to send workflow update (session likely disconnected): {}",
878 e
879 );
880 } else {
881 tracing::debug!(
882 %workflow_id,
883 client_id = %sub.client_sub_id,
884 "Pushed workflow update to client"
885 );
886 }
887 }
888 }
889
890 async fn handle_workflow_step_change(
892 step_id: Uuid,
893 workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
894 session_server: &Arc<SessionServer>,
895 db_pool: &sqlx::PgPool,
896 ) {
897 let workflow_id: Option<Uuid> = match sqlx::query_scalar(
899 "SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1",
900 )
901 .bind(step_id)
902 .fetch_optional(db_pool)
903 .await
904 {
905 Ok(id) => id,
906 Err(e) => {
907 tracing::warn!(%step_id, "Failed to look up workflow_run_id for step: {}", e);
908 return;
909 }
910 };
911
912 if let Some(wf_id) = workflow_id {
913 Self::handle_workflow_change(wf_id, workflow_subscriptions, session_server, db_pool)
915 .await;
916 }
917 }
918
919 #[allow(clippy::type_complexity)]
921 async fn fetch_job_data_static(
922 job_id: Uuid,
923 db_pool: &sqlx::PgPool,
924 ) -> forge_core::Result<JobData> {
925 let row: Option<(
926 String,
927 Option<i32>,
928 Option<String>,
929 Option<serde_json::Value>,
930 Option<String>,
931 )> = sqlx::query_as(
932 r#"
933 SELECT status, progress_percent, progress_message, output, last_error
934 FROM forge_jobs WHERE id = $1
935 "#,
936 )
937 .bind(job_id)
938 .fetch_optional(db_pool)
939 .await
940 .map_err(forge_core::ForgeError::Sql)?;
941
942 match row {
943 Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
944 job_id: job_id.to_string(),
945 status,
946 progress_percent,
947 progress_message,
948 output,
949 error,
950 }),
951 None => Err(forge_core::ForgeError::NotFound(format!(
952 "Job {} not found",
953 job_id
954 ))),
955 }
956 }
957
958 #[allow(clippy::type_complexity)]
960 async fn fetch_workflow_data_static(
961 workflow_id: Uuid,
962 db_pool: &sqlx::PgPool,
963 ) -> forge_core::Result<WorkflowData> {
964 let row: Option<(
965 String,
966 Option<String>,
967 Option<serde_json::Value>,
968 Option<String>,
969 )> = sqlx::query_as(
970 r#"
971 SELECT status, current_step, output, error
972 FROM forge_workflow_runs WHERE id = $1
973 "#,
974 )
975 .bind(workflow_id)
976 .fetch_optional(db_pool)
977 .await
978 .map_err(forge_core::ForgeError::Sql)?;
979
980 let (status, current_step, output, error) = match row {
981 Some(r) => r,
982 None => {
983 return Err(forge_core::ForgeError::NotFound(format!(
984 "Workflow {} not found",
985 workflow_id
986 )));
987 }
988 };
989
990 let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
991 r#"
992 SELECT step_name, status, error
993 FROM forge_workflow_steps
994 WHERE workflow_run_id = $1
995 ORDER BY started_at ASC NULLS LAST
996 "#,
997 )
998 .bind(workflow_id)
999 .fetch_all(db_pool)
1000 .await
1001 .map_err(forge_core::ForgeError::Sql)?;
1002
1003 let steps = step_rows
1004 .into_iter()
1005 .map(|(name, status, error)| WorkflowStepData {
1006 name,
1007 status,
1008 error,
1009 })
1010 .collect();
1011
1012 Ok(WorkflowData {
1013 workflow_id: workflow_id.to_string(),
1014 status,
1015 current_step,
1016 steps,
1017 output,
1018 error,
1019 })
1020 }
1021
1022 async fn execute_query_static(
1024 registry: &FunctionRegistry,
1025 db_pool: &sqlx::PgPool,
1026 query_name: &str,
1027 args: &serde_json::Value,
1028 auth_context: &forge_core::function::AuthContext,
1029 ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
1030 match registry.get(query_name) {
1031 Some(FunctionEntry::Query { info, handler }) => {
1032 let ctx = forge_core::function::QueryContext::new(
1033 db_pool.clone(),
1034 auth_context.clone(),
1035 forge_core::function::RequestMetadata::new(),
1036 );
1037
1038 let normalized_args = match args {
1039 v if v.is_object() && v.as_object().unwrap().is_empty() => {
1040 serde_json::Value::Null
1041 }
1042 v => v.clone(),
1043 };
1044
1045 let data = handler(&ctx, normalized_args).await?;
1046
1047 let mut read_set = ReadSet::new();
1049
1050 if info.table_dependencies.is_empty() {
1051 let table_name = Self::extract_table_name(query_name);
1053 read_set.add_table(&table_name);
1054 tracing::debug!(
1055 query = %query_name,
1056 fallback_table = %table_name,
1057 "No compile-time table dependencies found (static), using naming convention fallback"
1058 );
1059 } else {
1060 for table in info.table_dependencies {
1061 read_set.add_table(*table);
1062 }
1063 tracing::debug!(
1064 query = %query_name,
1065 tables = ?info.table_dependencies,
1066 "Using compile-time table dependencies (static)"
1067 );
1068 }
1069
1070 Ok((data, read_set))
1071 }
1072 _ => Err(forge_core::ForgeError::Validation(format!(
1073 "Query '{}' not found or not a query",
1074 query_name
1075 ))),
1076 }
1077 }
1078
1079 fn extract_table_name(query_name: &str) -> String {
1081 if let Some(rest) = query_name.strip_prefix("get_") {
1082 rest.to_string()
1083 } else if let Some(rest) = query_name.strip_prefix("list_") {
1084 rest.to_string()
1085 } else if let Some(rest) = query_name.strip_prefix("find_") {
1086 rest.to_string()
1087 } else if let Some(rest) = query_name.strip_prefix("fetch_") {
1088 rest.to_string()
1089 } else {
1090 query_name.to_string()
1091 }
1092 }
1093
1094 pub fn stop(&self) {
1096 let _ = self.shutdown_tx.send(());
1097 self.change_listener.stop();
1098 }
1099
1100 pub async fn stats(&self) -> ReactorStats {
1102 let session_stats = self.session_server.stats().await;
1103 let inv_stats = self.invalidation_engine.stats().await;
1104
1105 ReactorStats {
1106 connections: session_stats.connections,
1107 subscriptions: session_stats.subscriptions,
1108 pending_invalidations: inv_stats.pending_subscriptions,
1109 listener_running: self.change_listener.is_running(),
1110 }
1111 }
1112}
1113
1114#[derive(Debug, Clone)]
1116pub struct ReactorStats {
1117 pub connections: usize,
1118 pub subscriptions: usize,
1119 pub pending_invalidations: usize,
1120 pub listener_running: bool,
1121}
1122
1123#[cfg(test)]
1124mod tests {
1125 use super::*;
1126
1127 #[test]
1128 fn test_reactor_config_default() {
1129 let config = ReactorConfig::default();
1130 assert_eq!(config.listener.channel, "forge_changes");
1131 assert_eq!(config.invalidation.debounce_ms, 50);
1132 assert_eq!(config.max_listener_restarts, 5);
1133 assert_eq!(config.listener_restart_delay_ms, 1000);
1134 }
1135
1136 #[test]
1137 fn test_compute_hash() {
1138 let data1 = serde_json::json!({"name": "test"});
1139 let data2 = serde_json::json!({"name": "test"});
1140 let data3 = serde_json::json!({"name": "different"});
1141
1142 let hash1 = Reactor::compute_hash(&data1);
1143 let hash2 = Reactor::compute_hash(&data2);
1144 let hash3 = Reactor::compute_hash(&data3);
1145
1146 assert_eq!(hash1, hash2);
1147 assert_ne!(hash1, hash3);
1148 }
1149}