forge_runtime/realtime/
reactor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::websocket::{WebSocketConfig, WebSocketMessage, WebSocketServer};
14use crate::function::{FunctionEntry, FunctionRegistry};
15use crate::gateway::websocket::{JobData, WorkflowData, WorkflowStepData};
16
17/// Reactor configuration.
18#[derive(Debug, Clone, Default)]
19pub struct ReactorConfig {
20    pub listener: ListenerConfig,
21    pub invalidation: InvalidationConfig,
22    pub websocket: WebSocketConfig,
23}
24
25/// Active subscription with execution context.
26#[derive(Debug, Clone)]
27pub struct ActiveSubscription {
28    #[allow(dead_code)]
29    pub subscription_id: SubscriptionId,
30    pub session_id: SessionId,
31    #[allow(dead_code)]
32    pub client_sub_id: String,
33    pub query_name: String,
34    pub args: serde_json::Value,
35    pub last_result_hash: Option<String>,
36    #[allow(dead_code)]
37    pub read_set: ReadSet,
38    /// Auth context for re-executing the query on invalidation.
39    pub auth_context: forge_core::function::AuthContext,
40}
41
42/// Job subscription tracking.
43#[derive(Debug, Clone)]
44pub struct JobSubscription {
45    #[allow(dead_code)]
46    pub subscription_id: SubscriptionId,
47    pub session_id: SessionId,
48    pub client_sub_id: String,
49    #[allow(dead_code)]
50    pub job_id: Uuid, // Validated UUID, not String
51}
52
53/// Workflow subscription tracking.
54#[derive(Debug, Clone)]
55pub struct WorkflowSubscription {
56    #[allow(dead_code)]
57    pub subscription_id: SubscriptionId,
58    pub session_id: SessionId,
59    pub client_sub_id: String,
60    #[allow(dead_code)]
61    pub workflow_id: Uuid, // Validated UUID, not String
62}
63
64/// The Reactor orchestrates real-time reactivity.
65/// It connects: ChangeListener -> InvalidationEngine -> Query Re-execution -> WebSocket Push
66pub struct Reactor {
67    #[allow(dead_code)]
68    node_id: NodeId,
69    db_pool: sqlx::PgPool,
70    registry: FunctionRegistry,
71    subscription_manager: Arc<SubscriptionManager>,
72    ws_server: Arc<WebSocketServer>,
73    change_listener: Arc<ChangeListener>,
74    invalidation_engine: Arc<InvalidationEngine>,
75    /// Active subscriptions with their execution context.
76    active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
77    /// Job subscriptions: job_id -> list of subscribers.
78    job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
79    /// Workflow subscriptions: workflow_id -> list of subscribers.
80    workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
81    /// Shutdown signal.
82    shutdown_tx: broadcast::Sender<()>,
83}
84
85impl Reactor {
86    /// Create a new reactor.
87    pub fn new(
88        node_id: NodeId,
89        db_pool: sqlx::PgPool,
90        registry: FunctionRegistry,
91        config: ReactorConfig,
92    ) -> Self {
93        let subscription_manager = Arc::new(SubscriptionManager::new(
94            config.websocket.max_subscriptions_per_connection,
95        ));
96        let ws_server = Arc::new(WebSocketServer::new(node_id, config.websocket));
97        let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
98        let invalidation_engine = Arc::new(InvalidationEngine::new(
99            subscription_manager.clone(),
100            config.invalidation,
101        ));
102        let (shutdown_tx, _) = broadcast::channel(1);
103
104        Self {
105            node_id,
106            db_pool,
107            registry,
108            subscription_manager,
109            ws_server,
110            change_listener,
111            invalidation_engine,
112            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
113            job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
114            workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
115            shutdown_tx,
116        }
117    }
118
119    /// Get the node ID.
120    pub fn node_id(&self) -> NodeId {
121        self.node_id
122    }
123
124    /// Get the WebSocket server reference.
125    pub fn ws_server(&self) -> Arc<WebSocketServer> {
126        self.ws_server.clone()
127    }
128
129    /// Get the subscription manager reference.
130    pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
131        self.subscription_manager.clone()
132    }
133
134    /// Get a shutdown receiver.
135    pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
136        self.shutdown_tx.subscribe()
137    }
138
139    /// Register a new WebSocket session.
140    pub async fn register_session(
141        &self,
142        session_id: SessionId,
143        sender: mpsc::Sender<WebSocketMessage>,
144    ) {
145        self.ws_server.register_connection(session_id, sender).await;
146        tracing::debug!(?session_id, "Session registered with reactor");
147    }
148
149    /// Remove a session and all its subscriptions.
150    pub async fn remove_session(&self, session_id: SessionId) {
151        if let Some(subscription_ids) = self.ws_server.remove_connection(session_id).await {
152            // Clean up query subscriptions
153            for sub_id in subscription_ids {
154                self.subscription_manager.remove_subscription(sub_id).await;
155                self.active_subscriptions.write().await.remove(&sub_id);
156            }
157        }
158
159        // Clean up job subscriptions for this session
160        {
161            let mut job_subs = self.job_subscriptions.write().await;
162            for subscribers in job_subs.values_mut() {
163                subscribers.retain(|s| s.session_id != session_id);
164            }
165            // Remove empty entries
166            job_subs.retain(|_, v| !v.is_empty());
167        }
168
169        // Clean up workflow subscriptions for this session
170        {
171            let mut workflow_subs = self.workflow_subscriptions.write().await;
172            for subscribers in workflow_subs.values_mut() {
173                subscribers.retain(|s| s.session_id != session_id);
174            }
175            // Remove empty entries
176            workflow_subs.retain(|_, v| !v.is_empty());
177        }
178
179        tracing::debug!(?session_id, "Session removed from reactor");
180    }
181
182    /// Subscribe to a query.
183    pub async fn subscribe(
184        &self,
185        session_id: SessionId,
186        client_sub_id: String,
187        query_name: String,
188        args: serde_json::Value,
189        auth_context: forge_core::function::AuthContext,
190    ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
191        let sub_info = self
192            .subscription_manager
193            .create_subscription(session_id, &query_name, args.clone())
194            .await?;
195
196        let subscription_id = sub_info.id;
197
198        self.ws_server
199            .add_subscription(session_id, subscription_id)
200            .await?;
201
202        let (data, read_set) = self
203            .execute_query(&query_name, &args, &auth_context)
204            .await?;
205
206        let result_hash = Self::compute_hash(&data);
207
208        let tables: Vec<_> = read_set.tables.iter().collect();
209        tracing::debug!(
210            ?subscription_id,
211            query_name = %query_name,
212            read_set_tables = ?tables,
213            "Updating subscription with read set"
214        );
215
216        self.subscription_manager
217            .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
218            .await;
219
220        let active = ActiveSubscription {
221            subscription_id,
222            session_id,
223            client_sub_id,
224            query_name,
225            args,
226            last_result_hash: Some(result_hash),
227            read_set,
228            auth_context,
229        };
230        self.active_subscriptions
231            .write()
232            .await
233            .insert(subscription_id, active);
234
235        tracing::debug!(?subscription_id, "Subscription created");
236
237        Ok((subscription_id, data))
238    }
239
240    /// Unsubscribe from a query.
241    pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
242        self.ws_server.remove_subscription(subscription_id).await;
243        self.subscription_manager
244            .remove_subscription(subscription_id)
245            .await;
246        self.active_subscriptions
247            .write()
248            .await
249            .remove(&subscription_id);
250        tracing::debug!(?subscription_id, "Subscription removed");
251    }
252
253    /// Subscribe to job progress updates.
254    pub async fn subscribe_job(
255        &self,
256        session_id: SessionId,
257        client_sub_id: String,
258        job_id: Uuid, // Pre-validated UUID
259    ) -> forge_core::Result<JobData> {
260        let subscription_id = SubscriptionId::new();
261
262        // Fetch current job state from database
263        let job_data = self.fetch_job_data(job_id).await?;
264
265        // Register subscription
266        let subscription = JobSubscription {
267            subscription_id,
268            session_id,
269            client_sub_id: client_sub_id.clone(),
270            job_id,
271        };
272
273        let mut subs = self.job_subscriptions.write().await;
274        subs.entry(job_id).or_default().push(subscription);
275
276        tracing::debug!(
277            ?subscription_id,
278            client_id = %client_sub_id,
279            %job_id,
280            "Job subscription created"
281        );
282
283        Ok(job_data)
284    }
285
286    /// Unsubscribe from job updates.
287    pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
288        let mut subs = self.job_subscriptions.write().await;
289
290        // Find and remove the subscription
291        for subscribers in subs.values_mut() {
292            subscribers
293                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
294        }
295
296        // Remove empty entries
297        subs.retain(|_, v| !v.is_empty());
298
299        tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
300    }
301
302    /// Subscribe to workflow progress updates.
303    pub async fn subscribe_workflow(
304        &self,
305        session_id: SessionId,
306        client_sub_id: String,
307        workflow_id: Uuid, // Pre-validated UUID
308    ) -> forge_core::Result<WorkflowData> {
309        let subscription_id = SubscriptionId::new();
310
311        // Fetch current workflow + steps from database
312        let workflow_data = self.fetch_workflow_data(workflow_id).await?;
313
314        // Register subscription
315        let subscription = WorkflowSubscription {
316            subscription_id,
317            session_id,
318            client_sub_id: client_sub_id.clone(),
319            workflow_id,
320        };
321
322        let mut subs = self.workflow_subscriptions.write().await;
323        subs.entry(workflow_id).or_default().push(subscription);
324
325        tracing::debug!(
326            ?subscription_id,
327            client_id = %client_sub_id,
328            %workflow_id,
329            "Workflow subscription created"
330        );
331
332        Ok(workflow_data)
333    }
334
335    /// Unsubscribe from workflow updates.
336    pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
337        let mut subs = self.workflow_subscriptions.write().await;
338
339        // Find and remove the subscription
340        for subscribers in subs.values_mut() {
341            subscribers
342                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
343        }
344
345        // Remove empty entries
346        subs.retain(|_, v| !v.is_empty());
347
348        tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
349    }
350
351    /// Fetch current job data from database.
352    #[allow(clippy::type_complexity)]
353    async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
354        let row: Option<(
355            String,
356            Option<i32>,
357            Option<String>,
358            Option<serde_json::Value>,
359            Option<String>,
360        )> = sqlx::query_as(
361            r#"
362                SELECT status, progress_percent, progress_message, output, last_error
363                FROM forge_jobs WHERE id = $1
364                "#,
365        )
366        .bind(job_id)
367        .fetch_optional(&self.db_pool)
368        .await
369        .map_err(forge_core::ForgeError::Sql)?;
370
371        match row {
372            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
373                job_id: job_id.to_string(),
374                status,
375                progress_percent,
376                progress_message,
377                output,
378                error,
379            }),
380            None => Err(forge_core::ForgeError::NotFound(format!(
381                "Job {} not found",
382                job_id
383            ))),
384        }
385    }
386
387    /// Fetch current workflow + steps from database.
388    #[allow(clippy::type_complexity)]
389    async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
390        // Fetch workflow run
391        let row: Option<(
392            String,
393            Option<String>,
394            Option<serde_json::Value>,
395            Option<String>,
396        )> = sqlx::query_as(
397            r#"
398                SELECT status, current_step, output, error
399                FROM forge_workflow_runs WHERE id = $1
400                "#,
401        )
402        .bind(workflow_id)
403        .fetch_optional(&self.db_pool)
404        .await
405        .map_err(forge_core::ForgeError::Sql)?;
406
407        let (status, current_step, output, error) = match row {
408            Some(r) => r,
409            None => {
410                return Err(forge_core::ForgeError::NotFound(format!(
411                    "Workflow {} not found",
412                    workflow_id
413                )));
414            }
415        };
416
417        // Fetch workflow steps
418        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
419            r#"
420            SELECT step_name, status, error
421            FROM forge_workflow_steps
422            WHERE workflow_run_id = $1
423            ORDER BY started_at ASC NULLS LAST
424            "#,
425        )
426        .bind(workflow_id)
427        .fetch_all(&self.db_pool)
428        .await
429        .map_err(forge_core::ForgeError::Sql)?;
430
431        let steps = step_rows
432            .into_iter()
433            .map(|(name, status, error)| WorkflowStepData {
434                name,
435                status,
436                error,
437            })
438            .collect();
439
440        Ok(WorkflowData {
441            workflow_id: workflow_id.to_string(),
442            status,
443            current_step,
444            steps,
445            output,
446            error,
447        })
448    }
449
450    /// Execute a query and return data with read set.
451    async fn execute_query(
452        &self,
453        query_name: &str,
454        args: &serde_json::Value,
455        auth_context: &forge_core::function::AuthContext,
456    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
457        match self.registry.get(query_name) {
458            Some(FunctionEntry::Query { info, handler }) => {
459                let ctx = forge_core::function::QueryContext::new(
460                    self.db_pool.clone(),
461                    auth_context.clone(),
462                    forge_core::function::RequestMetadata::new(),
463                );
464
465                // Normalize args
466                let normalized_args = match args {
467                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
468                        serde_json::Value::Null
469                    }
470                    v => v.clone(),
471                };
472
473                let data = handler(&ctx, normalized_args).await?;
474
475                // Create read set from compile-time extracted table dependencies
476                let mut read_set = ReadSet::new();
477
478                if info.table_dependencies.is_empty() {
479                    // Fallback: no tables extracted (dynamic SQL)
480                    // Use naming convention as last resort
481                    let table_name = Self::extract_table_name(query_name);
482                    read_set.add_table(&table_name);
483                    tracing::debug!(
484                        query = %query_name,
485                        fallback_table = %table_name,
486                        "No compile-time table dependencies found, using naming convention fallback"
487                    );
488                } else {
489                    // Use compile-time extracted tables
490                    for table in info.table_dependencies {
491                        read_set.add_table(*table);
492                    }
493                    tracing::debug!(
494                        query = %query_name,
495                        tables = ?info.table_dependencies,
496                        "Using compile-time table dependencies"
497                    );
498                }
499
500                Ok((data, read_set))
501            }
502            Some(_) => Err(forge_core::ForgeError::Validation(format!(
503                "'{}' is not a query",
504                query_name
505            ))),
506            None => Err(forge_core::ForgeError::Validation(format!(
507                "Query '{}' not found",
508                query_name
509            ))),
510        }
511    }
512
513    /// Compute a hash of the result for delta detection.
514    fn compute_hash(data: &serde_json::Value) -> String {
515        use std::collections::hash_map::DefaultHasher;
516        use std::hash::{Hash, Hasher};
517
518        let json = serde_json::to_string(data).unwrap_or_default();
519        let mut hasher = DefaultHasher::new();
520        json.hash(&mut hasher);
521        format!("{:x}", hasher.finish())
522    }
523
524    /// Start the reactor (runs the change listener and invalidation loop).
525    pub async fn start(&self) -> forge_core::Result<()> {
526        let listener = self.change_listener.clone();
527        let invalidation_engine = self.invalidation_engine.clone();
528        let active_subscriptions = self.active_subscriptions.clone();
529        let job_subscriptions = self.job_subscriptions.clone();
530        let workflow_subscriptions = self.workflow_subscriptions.clone();
531        let ws_server = self.ws_server.clone();
532        let registry = self.registry.clone();
533        let db_pool = self.db_pool.clone();
534        let mut shutdown_rx = self.shutdown_tx.subscribe();
535
536        // Spawn change listener task
537        let listener_clone = listener.clone();
538        let listener_handle = tokio::spawn(async move {
539            if let Err(e) = listener_clone.run().await {
540                tracing::error!("Change listener error: {}", e);
541            }
542        });
543
544        // Subscribe to changes
545        let mut change_rx = listener.subscribe();
546
547        // Main reactor loop
548        tokio::spawn(async move {
549            tracing::info!("Reactor started, listening for changes");
550
551            loop {
552                tokio::select! {
553                    // Process incoming changes
554                    result = change_rx.recv() => {
555                        match result {
556                            Ok(change) => {
557                                Self::handle_change(
558                                    &change,
559                                    &invalidation_engine,
560                                    &active_subscriptions,
561                                    &job_subscriptions,
562                                    &workflow_subscriptions,
563                                    &ws_server,
564                                    &registry,
565                                    &db_pool,
566                                ).await;
567                            }
568                            Err(broadcast::error::RecvError::Lagged(n)) => {
569                                tracing::warn!("Reactor lagged by {} messages", n);
570                            }
571                            Err(broadcast::error::RecvError::Closed) => {
572                                tracing::info!("Change channel closed");
573                                break;
574                            }
575                        }
576                    }
577                    // Handle shutdown
578                    _ = shutdown_rx.recv() => {
579                        tracing::info!("Reactor shutting down");
580                        break;
581                    }
582                }
583            }
584
585            listener_handle.abort();
586        });
587
588        Ok(())
589    }
590
591    /// Handle a database change event.
592    #[allow(clippy::too_many_arguments)]
593    async fn handle_change(
594        change: &Change,
595        invalidation_engine: &Arc<InvalidationEngine>,
596        active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
597        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
598        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
599        ws_server: &Arc<WebSocketServer>,
600        registry: &FunctionRegistry,
601        db_pool: &sqlx::PgPool,
602    ) {
603        tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
604
605        // Handle job/workflow table changes first
606        match change.table.as_str() {
607            "forge_jobs" => {
608                if let Some(job_id) = change.row_id {
609                    Self::handle_job_change(job_id, job_subscriptions, ws_server, db_pool).await;
610                }
611                return; // Don't process through query invalidation
612            }
613            "forge_workflow_runs" => {
614                if let Some(workflow_id) = change.row_id {
615                    Self::handle_workflow_change(
616                        workflow_id,
617                        workflow_subscriptions,
618                        ws_server,
619                        db_pool,
620                    )
621                    .await;
622                }
623                return; // Don't process through query invalidation
624            }
625            "forge_workflow_steps" => {
626                // For step changes, need to look up the parent workflow_id
627                if let Some(step_id) = change.row_id {
628                    Self::handle_workflow_step_change(
629                        step_id,
630                        workflow_subscriptions,
631                        ws_server,
632                        db_pool,
633                    )
634                    .await;
635                }
636                return; // Don't process through query invalidation
637            }
638            _ => {}
639        }
640
641        // Process change through invalidation engine for query subscriptions
642        invalidation_engine.process_change(change.clone()).await;
643
644        // Flush all pending invalidations immediately for real-time updates
645        // Note: A more sophisticated approach would use the invalidation engine's run loop
646        // with proper debouncing for high-frequency changes
647        let invalidated = invalidation_engine.flush_all().await;
648
649        if invalidated.is_empty() {
650            return;
651        }
652
653        tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
654
655        // Collect subscription info under read lock, then release before async operations
656        let subs_to_process: Vec<_> = {
657            let subscriptions = active_subscriptions.read().await;
658            invalidated
659                .iter()
660                .filter_map(|sub_id| {
661                    subscriptions.get(sub_id).map(|active| {
662                        (
663                            *sub_id,
664                            active.session_id,
665                            active.query_name.clone(),
666                            active.args.clone(),
667                            active.last_result_hash.clone(),
668                            active.auth_context.clone(),
669                        )
670                    })
671                })
672                .collect()
673        };
674
675        // Track updates to apply after processing
676        let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
677
678        // Re-execute invalidated queries and push updates (without holding locks)
679        for (sub_id, session_id, query_name, args, last_hash, auth_context) in subs_to_process {
680            // Re-execute the query
681            match Self::execute_query_static(registry, db_pool, &query_name, &args, &auth_context)
682                .await
683            {
684                Ok((new_data, _read_set)) => {
685                    let new_hash = Self::compute_hash(&new_data);
686
687                    // Only push if data changed
688                    if last_hash.as_ref() != Some(&new_hash) {
689                        // Send updated data to client
690                        let message = WebSocketMessage::Data {
691                            subscription_id: sub_id,
692                            data: new_data,
693                        };
694
695                        if let Err(e) = ws_server.send_to_session(session_id, message).await {
696                            tracing::warn!(?sub_id, "Failed to send update: {}", e);
697                        } else {
698                            tracing::debug!(?sub_id, "Pushed update to client");
699                            // Track the hash update
700                            updates.push((sub_id, new_hash));
701                        }
702                    }
703                }
704                Err(e) => {
705                    tracing::error!(?sub_id, "Failed to re-execute query: {}", e);
706                }
707            }
708        }
709
710        // Update hashes for successfully sent updates
711        if !updates.is_empty() {
712            let mut subscriptions = active_subscriptions.write().await;
713            for (sub_id, new_hash) in updates {
714                if let Some(active) = subscriptions.get_mut(&sub_id) {
715                    active.last_result_hash = Some(new_hash);
716                }
717            }
718        }
719    }
720
721    /// Handle a job table change event.
722    async fn handle_job_change(
723        job_id: Uuid,
724        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
725        ws_server: &Arc<WebSocketServer>,
726        db_pool: &sqlx::PgPool,
727    ) {
728        let subs = job_subscriptions.read().await;
729        let subscribers = match subs.get(&job_id) {
730            Some(s) if !s.is_empty() => s.clone(),
731            _ => return, // No subscribers for this job
732        };
733        drop(subs); // Release lock before async operations
734
735        // Fetch latest job state
736        let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
737            Ok(data) => data,
738            Err(e) => {
739                tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
740                return;
741            }
742        };
743
744        // Push to all subscribers
745        for sub in subscribers {
746            let message = WebSocketMessage::JobUpdate {
747                client_sub_id: sub.client_sub_id.clone(),
748                job: job_data.clone(),
749            };
750
751            if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
752                // Debug level because this commonly happens when session disconnects (page refresh)
753                tracing::debug!(
754                    %job_id,
755                    client_id = %sub.client_sub_id,
756                    "Failed to send job update (session likely disconnected): {}",
757                    e
758                );
759            } else {
760                tracing::debug!(
761                    %job_id,
762                    client_id = %sub.client_sub_id,
763                    "Pushed job update to client"
764                );
765            }
766        }
767    }
768
769    /// Handle a workflow table change event.
770    async fn handle_workflow_change(
771        workflow_id: Uuid,
772        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
773        ws_server: &Arc<WebSocketServer>,
774        db_pool: &sqlx::PgPool,
775    ) {
776        let subs = workflow_subscriptions.read().await;
777        let subscribers = match subs.get(&workflow_id) {
778            Some(s) if !s.is_empty() => s.clone(),
779            _ => return, // No subscribers for this workflow
780        };
781        drop(subs); // Release lock before async operations
782
783        // Fetch latest workflow + steps state
784        let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
785            Ok(data) => data,
786            Err(e) => {
787                tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
788                return;
789            }
790        };
791
792        // Push to all subscribers
793        for sub in subscribers {
794            let message = WebSocketMessage::WorkflowUpdate {
795                client_sub_id: sub.client_sub_id.clone(),
796                workflow: workflow_data.clone(),
797            };
798
799            if let Err(e) = ws_server.send_to_session(sub.session_id, message).await {
800                // Debug level because this commonly happens when session disconnects (page refresh)
801                tracing::debug!(
802                    %workflow_id,
803                    client_id = %sub.client_sub_id,
804                    "Failed to send workflow update (session likely disconnected): {}",
805                    e
806                );
807            } else {
808                tracing::debug!(
809                    %workflow_id,
810                    client_id = %sub.client_sub_id,
811                    "Pushed workflow update to client"
812                );
813            }
814        }
815    }
816
817    /// Handle a workflow step change event.
818    async fn handle_workflow_step_change(
819        step_id: Uuid,
820        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
821        ws_server: &Arc<WebSocketServer>,
822        db_pool: &sqlx::PgPool,
823    ) {
824        // Look up the workflow_run_id for this step
825        let workflow_id: Option<Uuid> =
826            sqlx::query_scalar("SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1")
827                .bind(step_id)
828                .fetch_optional(db_pool)
829                .await
830                .ok()
831                .flatten();
832
833        if let Some(wf_id) = workflow_id {
834            // Delegate to workflow change handler
835            Self::handle_workflow_change(wf_id, workflow_subscriptions, ws_server, db_pool).await;
836        }
837    }
838
839    /// Static version of fetch_job_data for use in handle_change.
840    #[allow(clippy::type_complexity)]
841    async fn fetch_job_data_static(
842        job_id: Uuid,
843        db_pool: &sqlx::PgPool,
844    ) -> forge_core::Result<JobData> {
845        let row: Option<(
846            String,
847            Option<i32>,
848            Option<String>,
849            Option<serde_json::Value>,
850            Option<String>,
851        )> = sqlx::query_as(
852            r#"
853                SELECT status, progress_percent, progress_message, output, last_error
854                FROM forge_jobs WHERE id = $1
855                "#,
856        )
857        .bind(job_id)
858        .fetch_optional(db_pool)
859        .await
860        .map_err(forge_core::ForgeError::Sql)?;
861
862        match row {
863            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
864                job_id: job_id.to_string(),
865                status,
866                progress_percent,
867                progress_message,
868                output,
869                error,
870            }),
871            None => Err(forge_core::ForgeError::NotFound(format!(
872                "Job {} not found",
873                job_id
874            ))),
875        }
876    }
877
878    /// Static version of fetch_workflow_data for use in handle_change.
879    #[allow(clippy::type_complexity)]
880    async fn fetch_workflow_data_static(
881        workflow_id: Uuid,
882        db_pool: &sqlx::PgPool,
883    ) -> forge_core::Result<WorkflowData> {
884        let row: Option<(
885            String,
886            Option<String>,
887            Option<serde_json::Value>,
888            Option<String>,
889        )> = sqlx::query_as(
890            r#"
891                SELECT status, current_step, output, error
892                FROM forge_workflow_runs WHERE id = $1
893                "#,
894        )
895        .bind(workflow_id)
896        .fetch_optional(db_pool)
897        .await
898        .map_err(forge_core::ForgeError::Sql)?;
899
900        let (status, current_step, output, error) = match row {
901            Some(r) => r,
902            None => {
903                return Err(forge_core::ForgeError::NotFound(format!(
904                    "Workflow {} not found",
905                    workflow_id
906                )));
907            }
908        };
909
910        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
911            r#"
912            SELECT step_name, status, error
913            FROM forge_workflow_steps
914            WHERE workflow_run_id = $1
915            ORDER BY started_at ASC NULLS LAST
916            "#,
917        )
918        .bind(workflow_id)
919        .fetch_all(db_pool)
920        .await
921        .map_err(forge_core::ForgeError::Sql)?;
922
923        let steps = step_rows
924            .into_iter()
925            .map(|(name, status, error)| WorkflowStepData {
926                name,
927                status,
928                error,
929            })
930            .collect();
931
932        Ok(WorkflowData {
933            workflow_id: workflow_id.to_string(),
934            status,
935            current_step,
936            steps,
937            output,
938            error,
939        })
940    }
941
942    /// Static version of execute_query for use in async context.
943    async fn execute_query_static(
944        registry: &FunctionRegistry,
945        db_pool: &sqlx::PgPool,
946        query_name: &str,
947        args: &serde_json::Value,
948        auth_context: &forge_core::function::AuthContext,
949    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
950        match registry.get(query_name) {
951            Some(FunctionEntry::Query { info, handler }) => {
952                let ctx = forge_core::function::QueryContext::new(
953                    db_pool.clone(),
954                    auth_context.clone(),
955                    forge_core::function::RequestMetadata::new(),
956                );
957
958                let normalized_args = match args {
959                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
960                        serde_json::Value::Null
961                    }
962                    v => v.clone(),
963                };
964
965                let data = handler(&ctx, normalized_args).await?;
966
967                // Create read set from compile-time extracted table dependencies
968                let mut read_set = ReadSet::new();
969
970                if info.table_dependencies.is_empty() {
971                    // Fallback for dynamic SQL
972                    let table_name = Self::extract_table_name(query_name);
973                    read_set.add_table(&table_name);
974                    tracing::debug!(
975                        query = %query_name,
976                        fallback_table = %table_name,
977                        "No compile-time table dependencies found (static), using naming convention fallback"
978                    );
979                } else {
980                    for table in info.table_dependencies {
981                        read_set.add_table(*table);
982                    }
983                    tracing::debug!(
984                        query = %query_name,
985                        tables = ?info.table_dependencies,
986                        "Using compile-time table dependencies (static)"
987                    );
988                }
989
990                Ok((data, read_set))
991            }
992            _ => Err(forge_core::ForgeError::Validation(format!(
993                "Query '{}' not found or not a query",
994                query_name
995            ))),
996        }
997    }
998
999    /// Extract table name from query name using common patterns.
1000    fn extract_table_name(query_name: &str) -> String {
1001        if let Some(rest) = query_name.strip_prefix("get_") {
1002            rest.to_string()
1003        } else if let Some(rest) = query_name.strip_prefix("list_") {
1004            rest.to_string()
1005        } else if let Some(rest) = query_name.strip_prefix("find_") {
1006            rest.to_string()
1007        } else if let Some(rest) = query_name.strip_prefix("fetch_") {
1008            rest.to_string()
1009        } else {
1010            query_name.to_string()
1011        }
1012    }
1013
1014    /// Stop the reactor.
1015    pub fn stop(&self) {
1016        let _ = self.shutdown_tx.send(());
1017        self.change_listener.stop();
1018    }
1019
1020    /// Get reactor statistics.
1021    pub async fn stats(&self) -> ReactorStats {
1022        let ws_stats = self.ws_server.stats().await;
1023        let inv_stats = self.invalidation_engine.stats().await;
1024
1025        ReactorStats {
1026            connections: ws_stats.connections,
1027            subscriptions: ws_stats.subscriptions,
1028            pending_invalidations: inv_stats.pending_subscriptions,
1029            listener_running: self.change_listener.is_running(),
1030        }
1031    }
1032}
1033
1034/// Reactor statistics.
1035#[derive(Debug, Clone)]
1036pub struct ReactorStats {
1037    pub connections: usize,
1038    pub subscriptions: usize,
1039    pub pending_invalidations: usize,
1040    pub listener_running: bool,
1041}
1042
1043#[cfg(test)]
1044mod tests {
1045    use super::*;
1046
1047    #[test]
1048    fn test_reactor_config_default() {
1049        let config = ReactorConfig::default();
1050        assert_eq!(config.listener.channel, "forge_changes");
1051        assert_eq!(config.invalidation.debounce_ms, 50);
1052    }
1053
1054    #[test]
1055    fn test_compute_hash() {
1056        let data1 = serde_json::json!({"name": "test"});
1057        let data2 = serde_json::json!({"name": "test"});
1058        let data3 = serde_json::json!({"name": "different"});
1059
1060        let hash1 = Reactor::compute_hash(&data1);
1061        let hash2 = Reactor::compute_hash(&data2);
1062        let hash3 = Reactor::compute_hash(&data3);
1063
1064        assert_eq!(hash1, hash2);
1065        assert_ne!(hash1, hash3);
1066    }
1067}