Skip to main content

forge_runtime/realtime/
reactor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::message::{
14    JobData, RealtimeConfig, RealtimeMessage, SessionServer, WorkflowData, WorkflowStepData,
15};
16use crate::function::{FunctionEntry, FunctionRegistry};
17
18#[derive(Debug, Clone)]
19pub struct ReactorConfig {
20    pub listener: ListenerConfig,
21    pub invalidation: InvalidationConfig,
22    pub realtime: RealtimeConfig,
23    pub max_listener_restarts: u32,
24    /// Doubles with each attempt for exponential backoff
25    pub listener_restart_delay_ms: u64,
26}
27
28impl Default for ReactorConfig {
29    fn default() -> Self {
30        Self {
31            listener: ListenerConfig::default(),
32            invalidation: InvalidationConfig::default(),
33            realtime: RealtimeConfig::default(),
34            max_listener_restarts: 5,
35            listener_restart_delay_ms: 1000,
36        }
37    }
38}
39
40/// Active subscription with execution context.
41#[derive(Debug, Clone)]
42pub struct ActiveSubscription {
43    #[allow(dead_code)]
44    pub subscription_id: SubscriptionId,
45    pub session_id: SessionId,
46    #[allow(dead_code)]
47    pub client_sub_id: String,
48    pub query_name: String,
49    pub args: serde_json::Value,
50    pub last_result_hash: Option<String>,
51    #[allow(dead_code)]
52    pub read_set: ReadSet,
53    /// Auth context for re-executing the query on invalidation.
54    pub auth_context: forge_core::function::AuthContext,
55}
56
57/// Job subscription tracking.
58#[derive(Debug, Clone)]
59pub struct JobSubscription {
60    #[allow(dead_code)]
61    pub subscription_id: SubscriptionId,
62    pub session_id: SessionId,
63    pub client_sub_id: String,
64    #[allow(dead_code)]
65    pub job_id: Uuid, // Validated UUID, not String
66}
67
68/// Workflow subscription tracking.
69#[derive(Debug, Clone)]
70pub struct WorkflowSubscription {
71    #[allow(dead_code)]
72    pub subscription_id: SubscriptionId,
73    pub session_id: SessionId,
74    pub client_sub_id: String,
75    #[allow(dead_code)]
76    pub workflow_id: Uuid, // Validated UUID, not String
77}
78
79/// ChangeListener -> InvalidationEngine -> Query Re-execution -> SSE Push
80pub struct Reactor {
81    node_id: NodeId,
82    db_pool: sqlx::PgPool,
83    registry: FunctionRegistry,
84    subscription_manager: Arc<SubscriptionManager>,
85    session_server: Arc<SessionServer>,
86    change_listener: Arc<ChangeListener>,
87    invalidation_engine: Arc<InvalidationEngine>,
88    /// Active subscriptions with their execution context.
89    active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
90    /// Job subscriptions: job_id -> list of subscribers.
91    job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
92    /// Workflow subscriptions: workflow_id -> list of subscribers.
93    workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
94    /// Shutdown signal.
95    shutdown_tx: broadcast::Sender<()>,
96    /// Listener restart configuration
97    max_listener_restarts: u32,
98    listener_restart_delay_ms: u64,
99}
100
101impl Reactor {
102    /// Create a new reactor.
103    pub fn new(
104        node_id: NodeId,
105        db_pool: sqlx::PgPool,
106        registry: FunctionRegistry,
107        config: ReactorConfig,
108    ) -> Self {
109        let subscription_manager = Arc::new(SubscriptionManager::new(
110            config.realtime.max_subscriptions_per_session,
111        ));
112        let session_server = Arc::new(SessionServer::new(node_id, config.realtime.clone()));
113        let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
114        let invalidation_engine = Arc::new(InvalidationEngine::new(
115            subscription_manager.clone(),
116            config.invalidation,
117        ));
118        let (shutdown_tx, _) = broadcast::channel(1);
119
120        Self {
121            node_id,
122            db_pool,
123            registry,
124            subscription_manager,
125            session_server,
126            change_listener,
127            invalidation_engine,
128            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
129            job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
130            workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
131            shutdown_tx,
132            max_listener_restarts: config.max_listener_restarts,
133            listener_restart_delay_ms: config.listener_restart_delay_ms,
134        }
135    }
136
137    /// Get the node ID.
138    pub fn node_id(&self) -> NodeId {
139        self.node_id
140    }
141
142    /// Get the session server reference.
143    pub fn session_server(&self) -> Arc<SessionServer> {
144        self.session_server.clone()
145    }
146
147    /// Get the subscription manager reference.
148    pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
149        self.subscription_manager.clone()
150    }
151
152    /// Get a shutdown receiver.
153    pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
154        self.shutdown_tx.subscribe()
155    }
156
157    /// Register a new session.
158    pub async fn register_session(
159        &self,
160        session_id: SessionId,
161        sender: mpsc::Sender<RealtimeMessage>,
162    ) {
163        self.session_server
164            .register_connection(session_id, sender)
165            .await;
166        tracing::debug!(?session_id, "Session registered with reactor");
167    }
168
169    /// Remove a session and all its subscriptions.
170    pub async fn remove_session(&self, session_id: SessionId) {
171        if let Some(subscription_ids) = self.session_server.remove_connection(session_id).await {
172            // Clean up query subscriptions
173            for sub_id in subscription_ids {
174                self.subscription_manager.remove_subscription(sub_id).await;
175                self.active_subscriptions.write().await.remove(&sub_id);
176            }
177        }
178
179        // Clean up job subscriptions for this session
180        {
181            let mut job_subs = self.job_subscriptions.write().await;
182            for subscribers in job_subs.values_mut() {
183                subscribers.retain(|s| s.session_id != session_id);
184            }
185            // Remove empty entries
186            job_subs.retain(|_, v| !v.is_empty());
187        }
188
189        // Clean up workflow subscriptions for this session
190        {
191            let mut workflow_subs = self.workflow_subscriptions.write().await;
192            for subscribers in workflow_subs.values_mut() {
193                subscribers.retain(|s| s.session_id != session_id);
194            }
195            // Remove empty entries
196            workflow_subs.retain(|_, v| !v.is_empty());
197        }
198
199        tracing::debug!(?session_id, "Session removed from reactor");
200    }
201
202    /// Subscribe to a query.
203    pub async fn subscribe(
204        &self,
205        session_id: SessionId,
206        client_sub_id: String,
207        query_name: String,
208        args: serde_json::Value,
209        auth_context: forge_core::function::AuthContext,
210    ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
211        let sub_info = self
212            .subscription_manager
213            .create_subscription(session_id, &query_name, args.clone())
214            .await?;
215
216        let subscription_id = sub_info.id;
217
218        self.session_server
219            .add_subscription(session_id, subscription_id)
220            .await?;
221
222        let (data, read_set) = self
223            .execute_query(&query_name, &args, &auth_context)
224            .await?;
225
226        let result_hash = Self::compute_hash(&data);
227
228        let tables: Vec<_> = read_set.tables.iter().collect();
229        tracing::debug!(
230            ?subscription_id,
231            query_name = %query_name,
232            read_set_tables = ?tables,
233            "Updating subscription with read set"
234        );
235
236        self.subscription_manager
237            .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
238            .await;
239
240        let active = ActiveSubscription {
241            subscription_id,
242            session_id,
243            client_sub_id,
244            query_name,
245            args,
246            last_result_hash: Some(result_hash),
247            read_set,
248            auth_context,
249        };
250        self.active_subscriptions
251            .write()
252            .await
253            .insert(subscription_id, active);
254
255        tracing::debug!(?subscription_id, "Subscription created");
256
257        Ok((subscription_id, data))
258    }
259
260    /// Unsubscribe from a query.
261    pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
262        self.session_server
263            .remove_subscription(subscription_id)
264            .await;
265        self.subscription_manager
266            .remove_subscription(subscription_id)
267            .await;
268        self.active_subscriptions
269            .write()
270            .await
271            .remove(&subscription_id);
272        tracing::debug!(?subscription_id, "Subscription removed");
273    }
274
275    /// Subscribe to job progress updates.
276    pub async fn subscribe_job(
277        &self,
278        session_id: SessionId,
279        client_sub_id: String,
280        job_id: Uuid, // Pre-validated UUID
281    ) -> forge_core::Result<JobData> {
282        let subscription_id = SubscriptionId::new();
283
284        // Fetch current job state from database
285        let job_data = self.fetch_job_data(job_id).await?;
286
287        // Register subscription
288        let subscription = JobSubscription {
289            subscription_id,
290            session_id,
291            client_sub_id: client_sub_id.clone(),
292            job_id,
293        };
294
295        let mut subs = self.job_subscriptions.write().await;
296        subs.entry(job_id).or_default().push(subscription);
297
298        tracing::debug!(
299            ?subscription_id,
300            client_id = %client_sub_id,
301            %job_id,
302            "Job subscription created"
303        );
304
305        Ok(job_data)
306    }
307
308    /// Unsubscribe from job updates.
309    pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
310        let mut subs = self.job_subscriptions.write().await;
311
312        // Find and remove the subscription
313        for subscribers in subs.values_mut() {
314            subscribers
315                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
316        }
317
318        // Remove empty entries
319        subs.retain(|_, v| !v.is_empty());
320
321        tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
322    }
323
324    /// Subscribe to workflow progress updates.
325    pub async fn subscribe_workflow(
326        &self,
327        session_id: SessionId,
328        client_sub_id: String,
329        workflow_id: Uuid, // Pre-validated UUID
330    ) -> forge_core::Result<WorkflowData> {
331        let subscription_id = SubscriptionId::new();
332
333        // Fetch current workflow + steps from database
334        let workflow_data = self.fetch_workflow_data(workflow_id).await?;
335
336        // Register subscription
337        let subscription = WorkflowSubscription {
338            subscription_id,
339            session_id,
340            client_sub_id: client_sub_id.clone(),
341            workflow_id,
342        };
343
344        let mut subs = self.workflow_subscriptions.write().await;
345        subs.entry(workflow_id).or_default().push(subscription);
346
347        tracing::debug!(
348            ?subscription_id,
349            client_id = %client_sub_id,
350            %workflow_id,
351            "Workflow subscription created"
352        );
353
354        Ok(workflow_data)
355    }
356
357    /// Unsubscribe from workflow updates.
358    pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
359        let mut subs = self.workflow_subscriptions.write().await;
360
361        // Find and remove the subscription
362        for subscribers in subs.values_mut() {
363            subscribers
364                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
365        }
366
367        // Remove empty entries
368        subs.retain(|_, v| !v.is_empty());
369
370        tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
371    }
372
373    /// Fetch current job data from database.
374    #[allow(clippy::type_complexity)]
375    async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
376        let row: Option<(
377            String,
378            Option<i32>,
379            Option<String>,
380            Option<serde_json::Value>,
381            Option<String>,
382        )> = sqlx::query_as(
383            r#"
384                SELECT status, progress_percent, progress_message, output,
385                       COALESCE(cancel_reason, last_error) as error
386                FROM forge_jobs WHERE id = $1
387                "#,
388        )
389        .bind(job_id)
390        .fetch_optional(&self.db_pool)
391        .await
392        .map_err(forge_core::ForgeError::Sql)?;
393
394        match row {
395            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
396                job_id: job_id.to_string(),
397                status,
398                progress_percent,
399                progress_message,
400                output,
401                error,
402            }),
403            None => Err(forge_core::ForgeError::NotFound(format!(
404                "Job {} not found",
405                job_id
406            ))),
407        }
408    }
409
410    /// Fetch current workflow + steps from database.
411    #[allow(clippy::type_complexity)]
412    async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
413        // Fetch workflow run
414        let row: Option<(
415            String,
416            Option<String>,
417            Option<serde_json::Value>,
418            Option<String>,
419        )> = sqlx::query_as(
420            r#"
421                SELECT status, current_step, output, error
422                FROM forge_workflow_runs WHERE id = $1
423                "#,
424        )
425        .bind(workflow_id)
426        .fetch_optional(&self.db_pool)
427        .await
428        .map_err(forge_core::ForgeError::Sql)?;
429
430        let (status, current_step, output, error) = match row {
431            Some(r) => r,
432            None => {
433                return Err(forge_core::ForgeError::NotFound(format!(
434                    "Workflow {} not found",
435                    workflow_id
436                )));
437            }
438        };
439
440        // Fetch workflow steps
441        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
442            r#"
443            SELECT step_name, status, error
444            FROM forge_workflow_steps
445            WHERE workflow_run_id = $1
446            ORDER BY started_at ASC NULLS LAST
447            "#,
448        )
449        .bind(workflow_id)
450        .fetch_all(&self.db_pool)
451        .await
452        .map_err(forge_core::ForgeError::Sql)?;
453
454        let steps = step_rows
455            .into_iter()
456            .map(|(name, status, error)| WorkflowStepData {
457                name,
458                status,
459                error,
460            })
461            .collect();
462
463        Ok(WorkflowData {
464            workflow_id: workflow_id.to_string(),
465            status,
466            current_step,
467            steps,
468            output,
469            error,
470        })
471    }
472
473    /// Execute a query and return data with read set.
474    async fn execute_query(
475        &self,
476        query_name: &str,
477        args: &serde_json::Value,
478        auth_context: &forge_core::function::AuthContext,
479    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
480        match self.registry.get(query_name) {
481            Some(FunctionEntry::Query { info, handler }) => {
482                let ctx = forge_core::function::QueryContext::new(
483                    self.db_pool.clone(),
484                    auth_context.clone(),
485                    forge_core::function::RequestMetadata::new(),
486                );
487
488                // Normalize args
489                let normalized_args = match args {
490                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
491                        serde_json::Value::Null
492                    }
493                    v => v.clone(),
494                };
495
496                let data = handler(&ctx, normalized_args).await?;
497
498                // Create read set from compile-time extracted table dependencies
499                let mut read_set = ReadSet::new();
500
501                if info.table_dependencies.is_empty() {
502                    // Fallback: no tables extracted (dynamic SQL)
503                    // Use naming convention as last resort
504                    let table_name = Self::extract_table_name(query_name);
505                    read_set.add_table(&table_name);
506                    tracing::debug!(
507                        query = %query_name,
508                        fallback_table = %table_name,
509                        "No compile-time table dependencies found, using naming convention fallback"
510                    );
511                } else {
512                    // Use compile-time extracted tables
513                    for table in info.table_dependencies {
514                        read_set.add_table(*table);
515                    }
516                    tracing::debug!(
517                        query = %query_name,
518                        tables = ?info.table_dependencies,
519                        "Using compile-time table dependencies"
520                    );
521                }
522
523                Ok((data, read_set))
524            }
525            Some(_) => Err(forge_core::ForgeError::Validation(format!(
526                "'{}' is not a query",
527                query_name
528            ))),
529            None => Err(forge_core::ForgeError::Validation(format!(
530                "Query '{}' not found",
531                query_name
532            ))),
533        }
534    }
535
536    /// Compute a hash of the result for delta detection.
537    fn compute_hash(data: &serde_json::Value) -> String {
538        use std::collections::hash_map::DefaultHasher;
539        use std::hash::{Hash, Hasher};
540
541        let json = serde_json::to_string(data).unwrap_or_default();
542        let mut hasher = DefaultHasher::new();
543        json.hash(&mut hasher);
544        format!("{:x}", hasher.finish())
545    }
546
547    /// Start the reactor (runs the change listener and invalidation loop).
548    pub async fn start(&self) -> forge_core::Result<()> {
549        let listener = self.change_listener.clone();
550        let invalidation_engine = self.invalidation_engine.clone();
551        let active_subscriptions = self.active_subscriptions.clone();
552        let job_subscriptions = self.job_subscriptions.clone();
553        let workflow_subscriptions = self.workflow_subscriptions.clone();
554        let session_server = self.session_server.clone();
555        let registry = self.registry.clone();
556        let db_pool = self.db_pool.clone();
557        let mut shutdown_rx = self.shutdown_tx.subscribe();
558        let max_restarts = self.max_listener_restarts;
559        let base_delay_ms = self.listener_restart_delay_ms;
560
561        // Subscribe to changes
562        let mut change_rx = listener.subscribe();
563
564        // Main reactor loop
565        tokio::spawn(async move {
566            tracing::info!("Reactor started, listening for changes");
567
568            let mut restart_count: u32 = 0;
569            let (listener_error_tx, mut listener_error_rx) = mpsc::channel::<String>(1);
570
571            // Start initial listener
572            let listener_clone = listener.clone();
573            let error_tx = listener_error_tx.clone();
574            let mut listener_handle = Some(tokio::spawn(async move {
575                if let Err(e) = listener_clone.run().await {
576                    let _ = error_tx.send(format!("Change listener error: {}", e)).await;
577                }
578            }));
579
580            loop {
581                tokio::select! {
582                    result = change_rx.recv() => {
583                        match result {
584                            Ok(change) => {
585                                Self::handle_change(
586                                    &change,
587                                    &invalidation_engine,
588                                    &active_subscriptions,
589                                    &job_subscriptions,
590                                    &workflow_subscriptions,
591                                    &session_server,
592                                    &registry,
593                                    &db_pool,
594                                ).await;
595                            }
596                            Err(broadcast::error::RecvError::Lagged(n)) => {
597                                tracing::warn!("Reactor lagged by {} messages", n);
598                            }
599                            Err(broadcast::error::RecvError::Closed) => {
600                                tracing::info!("Change channel closed");
601                                break;
602                            }
603                        }
604                    }
605                    Some(error_msg) = listener_error_rx.recv() => {
606                        tracing::error!("Listener failed: {}", error_msg);
607
608                        if restart_count >= max_restarts {
609                            tracing::error!(
610                                "Listener failed {} times, giving up. Real-time updates disabled.",
611                                restart_count
612                            );
613                            break;
614                        }
615
616                        restart_count += 1;
617                        let delay = base_delay_ms * 2u64.saturating_pow(restart_count - 1);
618                        tracing::warn!(
619                            "Restarting listener in {}ms (attempt {}/{})",
620                            delay, restart_count, max_restarts
621                        );
622
623                        tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
624
625                        // Restart listener
626                        let listener_clone = listener.clone();
627                        let error_tx = listener_error_tx.clone();
628                        if let Some(handle) = listener_handle.take() {
629                            handle.abort();
630                        }
631                        change_rx = listener.subscribe();
632                        listener_handle = Some(tokio::spawn(async move {
633                            if let Err(e) = listener_clone.run().await {
634                                let _ = error_tx.send(format!("Change listener error: {}", e)).await;
635                            }
636                        }));
637                    }
638                    _ = shutdown_rx.recv() => {
639                        tracing::info!("Reactor shutting down");
640                        break;
641                    }
642                }
643            }
644
645            if let Some(handle) = listener_handle {
646                handle.abort();
647            }
648        });
649
650        Ok(())
651    }
652
653    /// Handle a database change event.
654    #[allow(clippy::too_many_arguments)]
655    async fn handle_change(
656        change: &Change,
657        invalidation_engine: &Arc<InvalidationEngine>,
658        active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
659        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
660        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
661        session_server: &Arc<SessionServer>,
662        registry: &FunctionRegistry,
663        db_pool: &sqlx::PgPool,
664    ) {
665        tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
666
667        // Handle job/workflow table changes first
668        match change.table.as_str() {
669            "forge_jobs" => {
670                if let Some(job_id) = change.row_id {
671                    Self::handle_job_change(job_id, job_subscriptions, session_server, db_pool)
672                        .await;
673                }
674                return; // Don't process through query invalidation
675            }
676            "forge_workflow_runs" => {
677                if let Some(workflow_id) = change.row_id {
678                    Self::handle_workflow_change(
679                        workflow_id,
680                        workflow_subscriptions,
681                        session_server,
682                        db_pool,
683                    )
684                    .await;
685                }
686                return; // Don't process through query invalidation
687            }
688            "forge_workflow_steps" => {
689                // For step changes, need to look up the parent workflow_id
690                if let Some(step_id) = change.row_id {
691                    Self::handle_workflow_step_change(
692                        step_id,
693                        workflow_subscriptions,
694                        session_server,
695                        db_pool,
696                    )
697                    .await;
698                }
699                return; // Don't process through query invalidation
700            }
701            _ => {}
702        }
703
704        // Process change through invalidation engine for query subscriptions
705        invalidation_engine.process_change(change.clone()).await;
706
707        // Check for subscriptions ready to invalidate based on debounce windows:
708        // - 50ms quiet period after last change
709        // - 200ms max wait from first change
710        // This prevents flooding during high-frequency updates (bulk inserts, rapid edits)
711        let invalidated = invalidation_engine.check_pending().await;
712
713        if invalidated.is_empty() {
714            return;
715        }
716
717        tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
718
719        // Collect subscription info under read lock, then release before async operations
720        let subs_to_process: Vec<_> = {
721            let subscriptions = active_subscriptions.read().await;
722            invalidated
723                .iter()
724                .filter_map(|sub_id| {
725                    subscriptions.get(sub_id).map(|active| {
726                        (
727                            *sub_id,
728                            active.session_id,
729                            active.client_sub_id.clone(),
730                            active.query_name.clone(),
731                            active.args.clone(),
732                            active.last_result_hash.clone(),
733                            active.auth_context.clone(),
734                        )
735                    })
736                })
737                .collect()
738        };
739
740        // Track updates to apply after processing
741        let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
742
743        // Re-execute invalidated queries and push updates (without holding locks)
744        for (sub_id, session_id, client_sub_id, query_name, args, last_hash, auth_context) in
745            subs_to_process
746        {
747            // Re-execute the query
748            match Self::execute_query_static(registry, db_pool, &query_name, &args, &auth_context)
749                .await
750            {
751                Ok((new_data, _read_set)) => {
752                    let new_hash = Self::compute_hash(&new_data);
753
754                    // Only push if data changed
755                    if last_hash.as_ref() != Some(&new_hash) {
756                        // Send updated data to client using client_sub_id for SSE target matching
757                        let message = RealtimeMessage::Data {
758                            subscription_id: client_sub_id.clone(),
759                            data: new_data,
760                        };
761
762                        if let Err(e) = session_server.send_to_session(session_id, message).await {
763                            tracing::warn!(client_id = %client_sub_id, "Failed to send update: {}", e);
764                        } else {
765                            tracing::debug!(client_id = %client_sub_id, "Pushed update to client");
766                            // Track the hash update
767                            updates.push((sub_id, new_hash));
768                        }
769                    }
770                }
771                Err(e) => {
772                    tracing::error!(client_id = %client_sub_id, "Failed to re-execute query: {}", e);
773                }
774            }
775        }
776
777        // Update hashes for successfully sent updates
778        if !updates.is_empty() {
779            let mut subscriptions = active_subscriptions.write().await;
780            for (sub_id, new_hash) in updates {
781                if let Some(active) = subscriptions.get_mut(&sub_id) {
782                    active.last_result_hash = Some(new_hash);
783                }
784            }
785        }
786    }
787
788    /// Handle a job table change event.
789    async fn handle_job_change(
790        job_id: Uuid,
791        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
792        session_server: &Arc<SessionServer>,
793        db_pool: &sqlx::PgPool,
794    ) {
795        let subs = job_subscriptions.read().await;
796        let subscribers = match subs.get(&job_id) {
797            Some(s) if !s.is_empty() => s.clone(),
798            _ => return, // No subscribers for this job
799        };
800        drop(subs); // Release lock before async operations
801
802        // Fetch latest job state
803        let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
804            Ok(data) => data,
805            Err(e) => {
806                tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
807                return;
808            }
809        };
810
811        // Push to all subscribers
812        for sub in subscribers {
813            let message = RealtimeMessage::JobUpdate {
814                client_sub_id: sub.client_sub_id.clone(),
815                job: job_data.clone(),
816            };
817
818            if let Err(e) = session_server
819                .send_to_session(sub.session_id, message)
820                .await
821            {
822                // Debug level because this commonly happens when session disconnects (page refresh)
823                tracing::debug!(
824                    %job_id,
825                    client_id = %sub.client_sub_id,
826                    "Failed to send job update (session likely disconnected): {}",
827                    e
828                );
829            } else {
830                tracing::debug!(
831                    %job_id,
832                    client_id = %sub.client_sub_id,
833                    "Pushed job update to client"
834                );
835            }
836        }
837    }
838
839    /// Handle a workflow table change event.
840    async fn handle_workflow_change(
841        workflow_id: Uuid,
842        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
843        session_server: &Arc<SessionServer>,
844        db_pool: &sqlx::PgPool,
845    ) {
846        let subs = workflow_subscriptions.read().await;
847        let subscribers = match subs.get(&workflow_id) {
848            Some(s) if !s.is_empty() => s.clone(),
849            _ => return, // No subscribers for this workflow
850        };
851        drop(subs); // Release lock before async operations
852
853        // Fetch latest workflow + steps state
854        let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
855            Ok(data) => data,
856            Err(e) => {
857                tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
858                return;
859            }
860        };
861
862        // Push to all subscribers
863        for sub in subscribers {
864            let message = RealtimeMessage::WorkflowUpdate {
865                client_sub_id: sub.client_sub_id.clone(),
866                workflow: workflow_data.clone(),
867            };
868
869            if let Err(e) = session_server
870                .send_to_session(sub.session_id, message)
871                .await
872            {
873                // Debug level because this commonly happens when session disconnects (page refresh)
874                tracing::debug!(
875                    %workflow_id,
876                    client_id = %sub.client_sub_id,
877                    "Failed to send workflow update (session likely disconnected): {}",
878                    e
879                );
880            } else {
881                tracing::debug!(
882                    %workflow_id,
883                    client_id = %sub.client_sub_id,
884                    "Pushed workflow update to client"
885                );
886            }
887        }
888    }
889
890    /// Handle a workflow step change event.
891    async fn handle_workflow_step_change(
892        step_id: Uuid,
893        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
894        session_server: &Arc<SessionServer>,
895        db_pool: &sqlx::PgPool,
896    ) {
897        // Look up the workflow_run_id for this step
898        let workflow_id: Option<Uuid> = match sqlx::query_scalar(
899            "SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1",
900        )
901        .bind(step_id)
902        .fetch_optional(db_pool)
903        .await
904        {
905            Ok(id) => id,
906            Err(e) => {
907                tracing::warn!(%step_id, "Failed to look up workflow_run_id for step: {}", e);
908                return;
909            }
910        };
911
912        if let Some(wf_id) = workflow_id {
913            // Delegate to workflow change handler
914            Self::handle_workflow_change(wf_id, workflow_subscriptions, session_server, db_pool)
915                .await;
916        }
917    }
918
919    /// Static version of fetch_job_data for use in handle_change.
920    #[allow(clippy::type_complexity)]
921    async fn fetch_job_data_static(
922        job_id: Uuid,
923        db_pool: &sqlx::PgPool,
924    ) -> forge_core::Result<JobData> {
925        let row: Option<(
926            String,
927            Option<i32>,
928            Option<String>,
929            Option<serde_json::Value>,
930            Option<String>,
931        )> = sqlx::query_as(
932            r#"
933                SELECT status, progress_percent, progress_message, output, last_error
934                FROM forge_jobs WHERE id = $1
935                "#,
936        )
937        .bind(job_id)
938        .fetch_optional(db_pool)
939        .await
940        .map_err(forge_core::ForgeError::Sql)?;
941
942        match row {
943            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
944                job_id: job_id.to_string(),
945                status,
946                progress_percent,
947                progress_message,
948                output,
949                error,
950            }),
951            None => Err(forge_core::ForgeError::NotFound(format!(
952                "Job {} not found",
953                job_id
954            ))),
955        }
956    }
957
958    /// Static version of fetch_workflow_data for use in handle_change.
959    #[allow(clippy::type_complexity)]
960    async fn fetch_workflow_data_static(
961        workflow_id: Uuid,
962        db_pool: &sqlx::PgPool,
963    ) -> forge_core::Result<WorkflowData> {
964        let row: Option<(
965            String,
966            Option<String>,
967            Option<serde_json::Value>,
968            Option<String>,
969        )> = sqlx::query_as(
970            r#"
971                SELECT status, current_step, output, error
972                FROM forge_workflow_runs WHERE id = $1
973                "#,
974        )
975        .bind(workflow_id)
976        .fetch_optional(db_pool)
977        .await
978        .map_err(forge_core::ForgeError::Sql)?;
979
980        let (status, current_step, output, error) = match row {
981            Some(r) => r,
982            None => {
983                return Err(forge_core::ForgeError::NotFound(format!(
984                    "Workflow {} not found",
985                    workflow_id
986                )));
987            }
988        };
989
990        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
991            r#"
992            SELECT step_name, status, error
993            FROM forge_workflow_steps
994            WHERE workflow_run_id = $1
995            ORDER BY started_at ASC NULLS LAST
996            "#,
997        )
998        .bind(workflow_id)
999        .fetch_all(db_pool)
1000        .await
1001        .map_err(forge_core::ForgeError::Sql)?;
1002
1003        let steps = step_rows
1004            .into_iter()
1005            .map(|(name, status, error)| WorkflowStepData {
1006                name,
1007                status,
1008                error,
1009            })
1010            .collect();
1011
1012        Ok(WorkflowData {
1013            workflow_id: workflow_id.to_string(),
1014            status,
1015            current_step,
1016            steps,
1017            output,
1018            error,
1019        })
1020    }
1021
1022    /// Static version of execute_query for use in async context.
1023    async fn execute_query_static(
1024        registry: &FunctionRegistry,
1025        db_pool: &sqlx::PgPool,
1026        query_name: &str,
1027        args: &serde_json::Value,
1028        auth_context: &forge_core::function::AuthContext,
1029    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
1030        match registry.get(query_name) {
1031            Some(FunctionEntry::Query { info, handler }) => {
1032                let ctx = forge_core::function::QueryContext::new(
1033                    db_pool.clone(),
1034                    auth_context.clone(),
1035                    forge_core::function::RequestMetadata::new(),
1036                );
1037
1038                let normalized_args = match args {
1039                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
1040                        serde_json::Value::Null
1041                    }
1042                    v => v.clone(),
1043                };
1044
1045                let data = handler(&ctx, normalized_args).await?;
1046
1047                // Create read set from compile-time extracted table dependencies
1048                let mut read_set = ReadSet::new();
1049
1050                if info.table_dependencies.is_empty() {
1051                    // Fallback for dynamic SQL
1052                    let table_name = Self::extract_table_name(query_name);
1053                    read_set.add_table(&table_name);
1054                    tracing::debug!(
1055                        query = %query_name,
1056                        fallback_table = %table_name,
1057                        "No compile-time table dependencies found (static), using naming convention fallback"
1058                    );
1059                } else {
1060                    for table in info.table_dependencies {
1061                        read_set.add_table(*table);
1062                    }
1063                    tracing::debug!(
1064                        query = %query_name,
1065                        tables = ?info.table_dependencies,
1066                        "Using compile-time table dependencies (static)"
1067                    );
1068                }
1069
1070                Ok((data, read_set))
1071            }
1072            _ => Err(forge_core::ForgeError::Validation(format!(
1073                "Query '{}' not found or not a query",
1074                query_name
1075            ))),
1076        }
1077    }
1078
1079    /// Extract table name from query name using common patterns.
1080    fn extract_table_name(query_name: &str) -> String {
1081        if let Some(rest) = query_name.strip_prefix("get_") {
1082            rest.to_string()
1083        } else if let Some(rest) = query_name.strip_prefix("list_") {
1084            rest.to_string()
1085        } else if let Some(rest) = query_name.strip_prefix("find_") {
1086            rest.to_string()
1087        } else if let Some(rest) = query_name.strip_prefix("fetch_") {
1088            rest.to_string()
1089        } else {
1090            query_name.to_string()
1091        }
1092    }
1093
1094    /// Stop the reactor.
1095    pub fn stop(&self) {
1096        let _ = self.shutdown_tx.send(());
1097        self.change_listener.stop();
1098    }
1099
1100    /// Get reactor statistics.
1101    pub async fn stats(&self) -> ReactorStats {
1102        let session_stats = self.session_server.stats().await;
1103        let inv_stats = self.invalidation_engine.stats().await;
1104
1105        ReactorStats {
1106            connections: session_stats.connections,
1107            subscriptions: session_stats.subscriptions,
1108            pending_invalidations: inv_stats.pending_subscriptions,
1109            listener_running: self.change_listener.is_running(),
1110        }
1111    }
1112}
1113
1114/// Reactor statistics.
1115#[derive(Debug, Clone)]
1116pub struct ReactorStats {
1117    pub connections: usize,
1118    pub subscriptions: usize,
1119    pub pending_invalidations: usize,
1120    pub listener_running: bool,
1121}
1122
1123#[cfg(test)]
1124mod tests {
1125    use super::*;
1126
1127    #[test]
1128    fn test_reactor_config_default() {
1129        let config = ReactorConfig::default();
1130        assert_eq!(config.listener.channel, "forge_changes");
1131        assert_eq!(config.invalidation.debounce_ms, 50);
1132        assert_eq!(config.max_listener_restarts, 5);
1133        assert_eq!(config.listener_restart_delay_ms, 1000);
1134    }
1135
1136    #[test]
1137    fn test_compute_hash() {
1138        let data1 = serde_json::json!({"name": "test"});
1139        let data2 = serde_json::json!({"name": "test"});
1140        let data3 = serde_json::json!({"name": "different"});
1141
1142        let hash1 = Reactor::compute_hash(&data1);
1143        let hash2 = Reactor::compute_hash(&data2);
1144        let hash3 = Reactor::compute_hash(&data3);
1145
1146        assert_eq!(hash1, hash2);
1147        assert_ne!(hash1, hash3);
1148    }
1149}