forge_runtime/realtime/
reactor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, RwLock};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::websocket::{WebSocketConfig, WebSocketMessage, WebSocketServer};
14use crate::function::{FunctionEntry, FunctionRegistry};
15use crate::gateway::websocket::{JobData, WorkflowData, WorkflowStepData};
16
17/// Reactor configuration.
18#[derive(Debug, Clone, Default)]
19pub struct ReactorConfig {
20    pub listener: ListenerConfig,
21    pub invalidation: InvalidationConfig,
22    pub websocket: WebSocketConfig,
23}
24
25/// Active subscription with execution context.
26#[derive(Debug, Clone)]
27pub struct ActiveSubscription {
28    #[allow(dead_code)]
29    pub subscription_id: SubscriptionId,
30    pub session_id: SessionId,
31    #[allow(dead_code)]
32    pub client_sub_id: String,
33    pub query_name: String,
34    pub args: serde_json::Value,
35    pub last_result_hash: Option<String>,
36    #[allow(dead_code)]
37    pub read_set: ReadSet,
38}
39
40/// Job subscription tracking.
41#[derive(Debug, Clone)]
42pub struct JobSubscription {
43    #[allow(dead_code)]
44    pub subscription_id: SubscriptionId,
45    pub session_id: SessionId,
46    pub client_sub_id: String,
47    #[allow(dead_code)]
48    pub job_id: Uuid, // Validated UUID, not String
49}
50
51/// Workflow subscription tracking.
52#[derive(Debug, Clone)]
53pub struct WorkflowSubscription {
54    #[allow(dead_code)]
55    pub subscription_id: SubscriptionId,
56    pub session_id: SessionId,
57    pub client_sub_id: String,
58    #[allow(dead_code)]
59    pub workflow_id: Uuid, // Validated UUID, not String
60}
61
62/// The Reactor orchestrates real-time reactivity.
63/// It connects: ChangeListener -> InvalidationEngine -> Query Re-execution -> WebSocket Push
64pub struct Reactor {
65    #[allow(dead_code)]
66    node_id: NodeId,
67    db_pool: sqlx::PgPool,
68    registry: FunctionRegistry,
69    subscription_manager: Arc<SubscriptionManager>,
70    ws_server: Arc<WebSocketServer>,
71    change_listener: Arc<ChangeListener>,
72    invalidation_engine: Arc<InvalidationEngine>,
73    /// Active subscriptions with their execution context.
74    active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
75    /// Job subscriptions: job_id -> list of subscribers.
76    job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
77    /// Workflow subscriptions: workflow_id -> list of subscribers.
78    workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
79    /// Shutdown signal.
80    shutdown_tx: broadcast::Sender<()>,
81}
82
83impl Reactor {
84    /// Create a new reactor.
85    pub fn new(
86        node_id: NodeId,
87        db_pool: sqlx::PgPool,
88        registry: FunctionRegistry,
89        config: ReactorConfig,
90    ) -> Self {
91        let subscription_manager = Arc::new(SubscriptionManager::new(
92            config.websocket.max_subscriptions_per_connection,
93        ));
94        let ws_server = Arc::new(WebSocketServer::new(node_id, config.websocket));
95        let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
96        let invalidation_engine = Arc::new(InvalidationEngine::new(
97            subscription_manager.clone(),
98            config.invalidation,
99        ));
100        let (shutdown_tx, _) = broadcast::channel(1);
101
102        Self {
103            node_id,
104            db_pool,
105            registry,
106            subscription_manager,
107            ws_server,
108            change_listener,
109            invalidation_engine,
110            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
111            job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
112            workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
113            shutdown_tx,
114        }
115    }
116
117    /// Get the node ID.
118    pub fn node_id(&self) -> NodeId {
119        self.node_id
120    }
121
122    /// Get the WebSocket server reference.
123    pub fn ws_server(&self) -> Arc<WebSocketServer> {
124        self.ws_server.clone()
125    }
126
127    /// Get the subscription manager reference.
128    pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
129        self.subscription_manager.clone()
130    }
131
132    /// Get a shutdown receiver.
133    pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
134        self.shutdown_tx.subscribe()
135    }
136
137    /// Register a new WebSocket session.
138    pub async fn register_session(
139        &self,
140        session_id: SessionId,
141        sender: mpsc::Sender<WebSocketMessage>,
142    ) {
143        self.ws_server.register_connection(session_id, sender).await;
144        tracing::debug!(?session_id, "Session registered with reactor");
145    }
146
147    /// Remove a session and all its subscriptions.
148    pub async fn remove_session(&self, session_id: SessionId) {
149        if let Some(subscription_ids) = self.ws_server.remove_connection(session_id).await {
150            // Clean up query subscriptions
151            for sub_id in subscription_ids {
152                self.subscription_manager.remove_subscription(sub_id).await;
153                self.active_subscriptions.write().await.remove(&sub_id);
154            }
155        }
156
157        // Clean up job subscriptions for this session
158        {
159            let mut job_subs = self.job_subscriptions.write().await;
160            for subscribers in job_subs.values_mut() {
161                subscribers.retain(|s| s.session_id != session_id);
162            }
163            // Remove empty entries
164            job_subs.retain(|_, v| !v.is_empty());
165        }
166
167        // Clean up workflow subscriptions for this session
168        {
169            let mut workflow_subs = self.workflow_subscriptions.write().await;
170            for subscribers in workflow_subs.values_mut() {
171                subscribers.retain(|s| s.session_id != session_id);
172            }
173            // Remove empty entries
174            workflow_subs.retain(|_, v| !v.is_empty());
175        }
176
177        tracing::debug!(?session_id, "Session removed from reactor");
178    }
179
180    /// Subscribe to a query.
181    pub async fn subscribe(
182        &self,
183        session_id: SessionId,
184        client_sub_id: String,
185        query_name: String,
186        args: serde_json::Value,
187    ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
188        // Create subscription in manager
189        let sub_info = self
190            .subscription_manager
191            .create_subscription(session_id, &query_name, args.clone())
192            .await?;
193
194        let subscription_id = sub_info.id;
195
196        // Add to WebSocket server
197        self.ws_server
198            .add_subscription(session_id, subscription_id)
199            .await?;
200
201        // Execute the query to get initial data
202        let (data, read_set) = self.execute_query(&query_name, &args).await?;
203
204        // Compute result hash for delta detection
205        let result_hash = Self::compute_hash(&data);
206
207        // Update subscription with read set
208        let tables: Vec<_> = read_set.tables.iter().collect();
209        tracing::debug!(
210            ?subscription_id,
211            query_name = %query_name,
212            read_set_tables = ?tables,
213            "Updating subscription with read set"
214        );
215
216        self.subscription_manager
217            .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
218            .await;
219
220        // Store active subscription
221        let active = ActiveSubscription {
222            subscription_id,
223            session_id,
224            client_sub_id,
225            query_name,
226            args,
227            last_result_hash: Some(result_hash),
228            read_set,
229        };
230        self.active_subscriptions
231            .write()
232            .await
233            .insert(subscription_id, active);
234
235        tracing::debug!(?subscription_id, "Subscription created");
236
237        Ok((subscription_id, data))
238    }
239
240    /// Unsubscribe from a query.
241    pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
242        self.ws_server.remove_subscription(subscription_id).await;
243        self.subscription_manager
244            .remove_subscription(subscription_id)
245            .await;
246        self.active_subscriptions
247            .write()
248            .await
249            .remove(&subscription_id);
250        tracing::debug!(?subscription_id, "Subscription removed");
251    }
252
253    /// Subscribe to job progress updates.
254    pub async fn subscribe_job(
255        &self,
256        session_id: SessionId,
257        client_sub_id: String,
258        job_id: Uuid, // Pre-validated UUID
259    ) -> forge_core::Result<JobData> {
260        let subscription_id = SubscriptionId::new();
261
262        // Fetch current job state from database
263        let job_data = self.fetch_job_data(job_id).await?;
264
265        // Register subscription
266        let subscription = JobSubscription {
267            subscription_id,
268            session_id,
269            client_sub_id: client_sub_id.clone(),
270            job_id,
271        };
272
273        let mut subs = self.job_subscriptions.write().await;
274        subs.entry(job_id).or_default().push(subscription);
275
276        tracing::debug!(
277            ?subscription_id,
278            client_id = %client_sub_id,
279            %job_id,
280            "Job subscription created"
281        );
282
283        Ok(job_data)
284    }
285
286    /// Unsubscribe from job updates.
287    pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
288        let mut subs = self.job_subscriptions.write().await;
289
290        // Find and remove the subscription
291        for subscribers in subs.values_mut() {
292            subscribers
293                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
294        }
295
296        // Remove empty entries
297        subs.retain(|_, v| !v.is_empty());
298
299        tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
300    }
301
302    /// Subscribe to workflow progress updates.
303    pub async fn subscribe_workflow(
304        &self,
305        session_id: SessionId,
306        client_sub_id: String,
307        workflow_id: Uuid, // Pre-validated UUID
308    ) -> forge_core::Result<WorkflowData> {
309        let subscription_id = SubscriptionId::new();
310
311        // Fetch current workflow + steps from database
312        let workflow_data = self.fetch_workflow_data(workflow_id).await?;
313
314        // Register subscription
315        let subscription = WorkflowSubscription {
316            subscription_id,
317            session_id,
318            client_sub_id: client_sub_id.clone(),
319            workflow_id,
320        };
321
322        let mut subs = self.workflow_subscriptions.write().await;
323        subs.entry(workflow_id).or_default().push(subscription);
324
325        tracing::debug!(
326            ?subscription_id,
327            client_id = %client_sub_id,
328            %workflow_id,
329            "Workflow subscription created"
330        );
331
332        Ok(workflow_data)
333    }
334
335    /// Unsubscribe from workflow updates.
336    pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
337        let mut subs = self.workflow_subscriptions.write().await;
338
339        // Find and remove the subscription
340        for subscribers in subs.values_mut() {
341            subscribers
342                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
343        }
344
345        // Remove empty entries
346        subs.retain(|_, v| !v.is_empty());
347
348        tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
349    }
350
351    /// Fetch current job data from database.
352    #[allow(clippy::type_complexity)]
353    async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
354        let row: Option<(
355            String,
356            Option<i32>,
357            Option<String>,
358            Option<serde_json::Value>,
359            Option<String>,
360        )> = sqlx::query_as(
361            r#"
362                SELECT status, progress_percent, progress_message, output, last_error
363                FROM forge_jobs WHERE id = $1
364                "#,
365        )
366        .bind(job_id)
367        .fetch_optional(&self.db_pool)
368        .await
369        .map_err(forge_core::ForgeError::Sql)?;
370
371        match row {
372            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
373                job_id: job_id.to_string(),
374                status,
375                progress_percent,
376                progress_message,
377                output,
378                error,
379            }),
380            None => Err(forge_core::ForgeError::NotFound(format!(
381                "Job {} not found",
382                job_id
383            ))),
384        }
385    }
386
387    /// Fetch current workflow + steps from database.
388    #[allow(clippy::type_complexity)]
389    async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
390        // Fetch workflow run
391        let row: Option<(
392            String,
393            Option<String>,
394            Option<serde_json::Value>,
395            Option<String>,
396        )> = sqlx::query_as(
397            r#"
398                SELECT status, current_step, output, error
399                FROM forge_workflow_runs WHERE id = $1
400                "#,
401        )
402        .bind(workflow_id)
403        .fetch_optional(&self.db_pool)
404        .await
405        .map_err(forge_core::ForgeError::Sql)?;
406
407        let (status, current_step, output, error) = match row {
408            Some(r) => r,
409            None => {
410                return Err(forge_core::ForgeError::NotFound(format!(
411                    "Workflow {} not found",
412                    workflow_id
413                )));
414            }
415        };
416
417        // Fetch workflow steps
418        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
419            r#"
420            SELECT step_name, status, error
421            FROM forge_workflow_steps
422            WHERE workflow_run_id = $1
423            ORDER BY started_at ASC NULLS LAST
424            "#,
425        )
426        .bind(workflow_id)
427        .fetch_all(&self.db_pool)
428        .await
429        .map_err(forge_core::ForgeError::Sql)?;
430
431        let steps = step_rows
432            .into_iter()
433            .map(|(name, status, error)| WorkflowStepData {
434                name,
435                status,
436                error,
437            })
438            .collect();
439
440        Ok(WorkflowData {
441            workflow_id: workflow_id.to_string(),
442            status,
443            current_step,
444            steps,
445            output,
446            error,
447        })
448    }
449
450    /// Execute a query and return data with read set.
451    async fn execute_query(
452        &self,
453        query_name: &str,
454        args: &serde_json::Value,
455    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
456        match self.registry.get(query_name) {
457            Some(FunctionEntry::Query { handler, .. }) => {
458                let ctx = forge_core::function::QueryContext::new(
459                    self.db_pool.clone(),
460                    forge_core::function::AuthContext::unauthenticated(),
461                    forge_core::function::RequestMetadata::new(),
462                );
463
464                // Normalize args
465                let normalized_args = match args {
466                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
467                        serde_json::Value::Null
468                    }
469                    v => v.clone(),
470                };
471
472                let data = handler(&ctx, normalized_args).await?;
473
474                // Create a read set based on the query name
475                // For queries like "get_users", track the "users" table
476                let mut read_set = ReadSet::new();
477                let table_name = Self::extract_table_name(query_name);
478                read_set.add_table(&table_name);
479
480                Ok((data, read_set))
481            }
482            Some(_) => Err(forge_core::ForgeError::Validation(format!(
483                "'{}' is not a query",
484                query_name
485            ))),
486            None => Err(forge_core::ForgeError::Validation(format!(
487                "Query '{}' not found",
488                query_name
489            ))),
490        }
491    }
492
493    /// Compute a hash of the result for delta detection.
494    fn compute_hash(data: &serde_json::Value) -> String {
495        use std::collections::hash_map::DefaultHasher;
496        use std::hash::{Hash, Hasher};
497
498        let json = serde_json::to_string(data).unwrap_or_default();
499        let mut hasher = DefaultHasher::new();
500        json.hash(&mut hasher);
501        format!("{:x}", hasher.finish())
502    }
503
504    /// Start the reactor (runs the change listener and invalidation loop).
505    pub async fn start(&self) -> forge_core::Result<()> {
506        let listener = self.change_listener.clone();
507        let invalidation_engine = self.invalidation_engine.clone();
508        let active_subscriptions = self.active_subscriptions.clone();
509        let job_subscriptions = self.job_subscriptions.clone();
510        let workflow_subscriptions = self.workflow_subscriptions.clone();
511        let ws_server = self.ws_server.clone();
512        let registry = self.registry.clone();
513        let db_pool = self.db_pool.clone();
514        let mut shutdown_rx = self.shutdown_tx.subscribe();
515
516        // Spawn change listener task
517        let listener_clone = listener.clone();
518        let listener_handle = tokio::spawn(async move {
519            if let Err(e) = listener_clone.run().await {
520                tracing::error!("Change listener error: {}", e);
521            }
522        });
523
524        // Subscribe to changes
525        let mut change_rx = listener.subscribe();
526
527        // Main reactor loop
528        tokio::spawn(async move {
529            tracing::info!("Reactor started, listening for changes");
530
531            loop {
532                tokio::select! {
533                    // Process incoming changes
534                    result = change_rx.recv() => {
535                        match result {
536                            Ok(change) => {
537                                Self::handle_change(
538                                    &change,
539                                    &invalidation_engine,
540                                    &active_subscriptions,
541                                    &job_subscriptions,
542                                    &workflow_subscriptions,
543                                    &ws_server,
544                                    &registry,
545                                    &db_pool,
546                                ).await;
547                            }
548                            Err(broadcast::error::RecvError::Lagged(n)) => {
549                                tracing::warn!("Reactor lagged by {} messages", n);
550                            }
551                            Err(broadcast::error::RecvError::Closed) => {
552                                tracing::info!("Change channel closed");
553                                break;
554                            }
555                        }
556                    }
557                    // Handle shutdown
558                    _ = shutdown_rx.recv() => {
559                        tracing::info!("Reactor shutting down");
560                        break;
561                    }
562                }
563            }
564
565            listener_handle.abort();
566        });
567
568        Ok(())
569    }
570
571    /// Handle a database change event.
572    #[allow(clippy::too_many_arguments)]
573    async fn handle_change(
574        change: &Change,
575        invalidation_engine: &Arc<InvalidationEngine>,
576        active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
577        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
578        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
579        ws_server: &Arc<WebSocketServer>,
580        registry: &FunctionRegistry,
581        db_pool: &sqlx::PgPool,
582    ) {
583        tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
584
585        // Handle job/workflow table changes first
586        match change.table.as_str() {
587            "forge_jobs" => {
588                if let Some(job_id) = change.row_id {
589                    Self::handle_job_change(job_id, job_subscriptions, ws_server, db_pool).await;
590                }
591                return; // Don't process through query invalidation
592            }
593            "forge_workflow_runs" => {
594                if let Some(workflow_id) = change.row_id {
595                    Self::handle_workflow_change(
596                        workflow_id,
597                        workflow_subscriptions,
598                        ws_server,
599                        db_pool,
600                    )
601                    .await;
602                }
603                return; // Don't process through query invalidation
604            }
605            "forge_workflow_steps" => {
606                // For step changes, need to look up the parent workflow_id
607                if let Some(step_id) = change.row_id {
608                    Self::handle_workflow_step_change(
609                        step_id,
610                        workflow_subscriptions,
611                        ws_server,
612                        db_pool,
613                    )
614                    .await;
615                }
616                return; // Don't process through query invalidation
617            }
618            _ => {}
619        }
620
621        // Process change through invalidation engine for query subscriptions
622        invalidation_engine.process_change(change.clone()).await;
623
624        // Flush all pending invalidations immediately for real-time updates
625        // Note: A more sophisticated approach would use the invalidation engine's run loop
626        // with proper debouncing for high-frequency changes
627        let invalidated = invalidation_engine.flush_all().await;
628
629        if invalidated.is_empty() {
630            return;
631        }
632
633        tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
634
635        // Collect subscription info under read lock, then release before async operations
636        let subs_to_process: Vec<_> = {
637            let subscriptions = active_subscriptions.read().await;
638            invalidated
639                .iter()
640                .filter_map(|sub_id| {
641                    subscriptions.get(sub_id).map(|active| {
642                        (
643                            *sub_id,
644                            active.session_id,
645                            active.query_name.clone(),
646                            active.args.clone(),
647                            active.last_result_hash.clone(),
648                        )
649                    })
650                })
651                .collect()
652        };
653
654        // Track updates to apply after processing
655        let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
656
657        // Re-execute invalidated queries and push updates (without holding locks)
658        for (sub_id, session_id, query_name, args, last_hash) in subs_to_process {
659            // Re-execute the query
660            match Self::execute_query_static(registry, db_pool, &query_name, &args).await {
661                Ok((new_data, _read_set)) => {
662                    let new_hash = Self::compute_hash(&new_data);
663
664                    // Only push if data changed
665                    if last_hash.as_ref() != Some(&new_hash) {
666                        // Send updated data to client
667                        let message = WebSocketMessage::Data {
668                            subscription_id: sub_id,
669                            data: new_data,
670                        };
671
672                        if let Err(e) = ws_server.send_to_session(session_id, message).await {
673                            tracing::warn!(?sub_id, "Failed to send update: {}", e);
674                        } else {
675                            tracing::debug!(?sub_id, "Pushed update to client");
676                            // Track the hash update
677                            updates.push((sub_id, new_hash));
678                        }
679                    }
680                }
681                Err(e) => {
682                    tracing::error!(?sub_id, "Failed to re-execute query: {}", e);
683                }
684            }
685        }
686
687        // Update hashes for successfully sent updates
688        if !updates.is_empty() {
689            let mut subscriptions = active_subscriptions.write().await;
690            for (sub_id, new_hash) in updates {
691                if let Some(active) = subscriptions.get_mut(&sub_id) {
692                    active.last_result_hash = Some(new_hash);
693                }
694            }
695        }
696    }
697
698    /// Handle a job table change event.
699    async fn handle_job_change(
700        job_id: Uuid,
701        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
702        ws_server: &Arc<WebSocketServer>,
703        db_pool: &sqlx::PgPool,
704    ) {
705        let subs = job_subscriptions.read().await;
706        let subscribers = match subs.get(&job_id) {
707            Some(s) if !s.is_empty() => s.clone(),
708            _ => return, // No subscribers for this job
709        };
710        drop(subs); // Release lock before async operations
711
712        // Fetch latest job state
713        let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
714            Ok(data) => data,
715            Err(e) => {
716                tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
717                return;
718            }
719        };
720
721        // Push to all subscribers
722        for sub in subscribers {
723            let message = WebSocketMessage::JobUpdate {
724                client_sub_id: sub.client_sub_id.clone(),
725                job: job_data.clone(),
726            };
727
728            if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
729                // Debug level because this commonly happens when session disconnects (page refresh)
730                tracing::debug!(
731                    %job_id,
732                    client_id = %sub.client_sub_id,
733                    "Failed to send job update (session likely disconnected): {}",
734                    e
735                );
736            } else {
737                tracing::debug!(
738                    %job_id,
739                    client_id = %sub.client_sub_id,
740                    "Pushed job update to client"
741                );
742            }
743        }
744    }
745
746    /// Handle a workflow table change event.
747    async fn handle_workflow_change(
748        workflow_id: Uuid,
749        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
750        ws_server: &Arc<WebSocketServer>,
751        db_pool: &sqlx::PgPool,
752    ) {
753        let subs = workflow_subscriptions.read().await;
754        let subscribers = match subs.get(&workflow_id) {
755            Some(s) if !s.is_empty() => s.clone(),
756            _ => return, // No subscribers for this workflow
757        };
758        drop(subs); // Release lock before async operations
759
760        // Fetch latest workflow + steps state
761        let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
762            Ok(data) => data,
763            Err(e) => {
764                tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
765                return;
766            }
767        };
768
769        // Push to all subscribers
770        for sub in subscribers {
771            let message = WebSocketMessage::WorkflowUpdate {
772                client_sub_id: sub.client_sub_id.clone(),
773                workflow: workflow_data.clone(),
774            };
775
776            if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
777                // Debug level because this commonly happens when session disconnects (page refresh)
778                tracing::debug!(
779                    %workflow_id,
780                    client_id = %sub.client_sub_id,
781                    "Failed to send workflow update (session likely disconnected): {}",
782                    e
783                );
784            } else {
785                tracing::debug!(
786                    %workflow_id,
787                    client_id = %sub.client_sub_id,
788                    "Pushed workflow update to client"
789                );
790            }
791        }
792    }
793
794    /// Handle a workflow step change event.
795    async fn handle_workflow_step_change(
796        step_id: Uuid,
797        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
798        ws_server: &Arc<WebSocketServer>,
799        db_pool: &sqlx::PgPool,
800    ) {
801        // Look up the workflow_run_id for this step
802        let workflow_id: Option<Uuid> =
803            sqlx::query_scalar("SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1")
804                .bind(step_id)
805                .fetch_optional(db_pool)
806                .await
807                .ok()
808                .flatten();
809
810        if let Some(wf_id) = workflow_id {
811            // Delegate to workflow change handler
812            Self::handle_workflow_change(wf_id, workflow_subscriptions, ws_server, db_pool).await;
813        }
814    }
815
816    /// Static version of fetch_job_data for use in handle_change.
817    #[allow(clippy::type_complexity)]
818    async fn fetch_job_data_static(
819        job_id: Uuid,
820        db_pool: &sqlx::PgPool,
821    ) -> forge_core::Result<JobData> {
822        let row: Option<(
823            String,
824            Option<i32>,
825            Option<String>,
826            Option<serde_json::Value>,
827            Option<String>,
828        )> = sqlx::query_as(
829            r#"
830                SELECT status, progress_percent, progress_message, output, last_error
831                FROM forge_jobs WHERE id = $1
832                "#,
833        )
834        .bind(job_id)
835        .fetch_optional(db_pool)
836        .await
837        .map_err(forge_core::ForgeError::Sql)?;
838
839        match row {
840            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
841                job_id: job_id.to_string(),
842                status,
843                progress_percent,
844                progress_message,
845                output,
846                error,
847            }),
848            None => Err(forge_core::ForgeError::NotFound(format!(
849                "Job {} not found",
850                job_id
851            ))),
852        }
853    }
854
855    /// Static version of fetch_workflow_data for use in handle_change.
856    #[allow(clippy::type_complexity)]
857    async fn fetch_workflow_data_static(
858        workflow_id: Uuid,
859        db_pool: &sqlx::PgPool,
860    ) -> forge_core::Result<WorkflowData> {
861        let row: Option<(
862            String,
863            Option<String>,
864            Option<serde_json::Value>,
865            Option<String>,
866        )> = sqlx::query_as(
867            r#"
868                SELECT status, current_step, output, error
869                FROM forge_workflow_runs WHERE id = $1
870                "#,
871        )
872        .bind(workflow_id)
873        .fetch_optional(db_pool)
874        .await
875        .map_err(forge_core::ForgeError::Sql)?;
876
877        let (status, current_step, output, error) = match row {
878            Some(r) => r,
879            None => {
880                return Err(forge_core::ForgeError::NotFound(format!(
881                    "Workflow {} not found",
882                    workflow_id
883                )));
884            }
885        };
886
887        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
888            r#"
889            SELECT step_name, status, error
890            FROM forge_workflow_steps
891            WHERE workflow_run_id = $1
892            ORDER BY started_at ASC NULLS LAST
893            "#,
894        )
895        .bind(workflow_id)
896        .fetch_all(db_pool)
897        .await
898        .map_err(forge_core::ForgeError::Sql)?;
899
900        let steps = step_rows
901            .into_iter()
902            .map(|(name, status, error)| WorkflowStepData {
903                name,
904                status,
905                error,
906            })
907            .collect();
908
909        Ok(WorkflowData {
910            workflow_id: workflow_id.to_string(),
911            status,
912            current_step,
913            steps,
914            output,
915            error,
916        })
917    }
918
919    /// Static version of execute_query for use in async context.
920    async fn execute_query_static(
921        registry: &FunctionRegistry,
922        db_pool: &sqlx::PgPool,
923        query_name: &str,
924        args: &serde_json::Value,
925    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
926        match registry.get(query_name) {
927            Some(FunctionEntry::Query { handler, .. }) => {
928                let ctx = forge_core::function::QueryContext::new(
929                    db_pool.clone(),
930                    forge_core::function::AuthContext::unauthenticated(),
931                    forge_core::function::RequestMetadata::new(),
932                );
933
934                let normalized_args = match args {
935                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
936                        serde_json::Value::Null
937                    }
938                    v => v.clone(),
939                };
940
941                let data = handler(&ctx, normalized_args).await?;
942
943                // Create a read set based on the query name
944                let mut read_set = ReadSet::new();
945                let table_name = Self::extract_table_name(query_name);
946                read_set.add_table(&table_name);
947
948                Ok((data, read_set))
949            }
950            _ => Err(forge_core::ForgeError::Validation(format!(
951                "Query '{}' not found or not a query",
952                query_name
953            ))),
954        }
955    }
956
957    /// Extract table name from query name using common patterns.
958    fn extract_table_name(query_name: &str) -> String {
959        if let Some(rest) = query_name.strip_prefix("get_") {
960            rest.to_string()
961        } else if let Some(rest) = query_name.strip_prefix("list_") {
962            rest.to_string()
963        } else if let Some(rest) = query_name.strip_prefix("find_") {
964            rest.to_string()
965        } else if let Some(rest) = query_name.strip_prefix("fetch_") {
966            rest.to_string()
967        } else {
968            query_name.to_string()
969        }
970    }
971
972    /// Stop the reactor.
973    pub fn stop(&self) {
974        let _ = self.shutdown_tx.send(());
975        self.change_listener.stop();
976    }
977
978    /// Get reactor statistics.
979    pub async fn stats(&self) -> ReactorStats {
980        let ws_stats = self.ws_server.stats().await;
981        let inv_stats = self.invalidation_engine.stats().await;
982
983        ReactorStats {
984            connections: ws_stats.connections,
985            subscriptions: ws_stats.subscriptions,
986            pending_invalidations: inv_stats.pending_subscriptions,
987            listener_running: self.change_listener.is_running(),
988        }
989    }
990}
991
992/// Reactor statistics.
993#[derive(Debug, Clone)]
994pub struct ReactorStats {
995    pub connections: usize,
996    pub subscriptions: usize,
997    pub pending_invalidations: usize,
998    pub listener_running: bool,
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004
1005    #[test]
1006    fn test_reactor_config_default() {
1007        let config = ReactorConfig::default();
1008        assert_eq!(config.listener.channel, "forge_changes");
1009        assert_eq!(config.invalidation.debounce_ms, 50);
1010    }
1011
1012    #[test]
1013    fn test_compute_hash() {
1014        let data1 = serde_json::json!({"name": "test"});
1015        let data2 = serde_json::json!({"name": "test"});
1016        let data3 = serde_json::json!({"name": "different"});
1017
1018        let hash1 = Reactor::compute_hash(&data1);
1019        let hash2 = Reactor::compute_hash(&data2);
1020        let hash3 = Reactor::compute_hash(&data3);
1021
1022        assert_eq!(hash1, hash2);
1023        assert_ne!(hash1, hash3);
1024    }
1025}