Skip to main content

forge_runtime/realtime/
reactor.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::{RwLock, broadcast, mpsc};
5use uuid::Uuid;
6
7use forge_core::cluster::NodeId;
8use forge_core::realtime::{Change, ReadSet, SessionId, SubscriptionId};
9
10use super::invalidation::{InvalidationConfig, InvalidationEngine};
11use super::listener::{ChangeListener, ListenerConfig};
12use super::manager::SubscriptionManager;
13use super::message::{
14    JobData, RealtimeConfig, RealtimeMessage, SessionServer, WorkflowData, WorkflowStepData,
15};
16use crate::function::{FunctionEntry, FunctionRegistry};
17
18#[derive(Debug, Clone)]
19pub struct ReactorConfig {
20    pub listener: ListenerConfig,
21    pub invalidation: InvalidationConfig,
22    pub realtime: RealtimeConfig,
23    pub max_listener_restarts: u32,
24    /// Doubles with each attempt for exponential backoff
25    pub listener_restart_delay_ms: u64,
26}
27
28impl Default for ReactorConfig {
29    fn default() -> Self {
30        Self {
31            listener: ListenerConfig::default(),
32            invalidation: InvalidationConfig::default(),
33            realtime: RealtimeConfig::default(),
34            max_listener_restarts: 5,
35            listener_restart_delay_ms: 1000,
36        }
37    }
38}
39
40/// Active subscription with execution context.
41#[derive(Debug, Clone)]
42pub struct ActiveSubscription {
43    #[allow(dead_code)]
44    pub subscription_id: SubscriptionId,
45    pub session_id: SessionId,
46    #[allow(dead_code)]
47    pub client_sub_id: String,
48    pub query_name: String,
49    pub args: serde_json::Value,
50    pub last_result_hash: Option<String>,
51    #[allow(dead_code)]
52    pub read_set: ReadSet,
53    /// Auth context for re-executing the query on invalidation.
54    pub auth_context: forge_core::function::AuthContext,
55}
56
57/// Job subscription tracking.
58#[derive(Debug, Clone)]
59pub struct JobSubscription {
60    #[allow(dead_code)]
61    pub subscription_id: SubscriptionId,
62    pub session_id: SessionId,
63    pub client_sub_id: String,
64    #[allow(dead_code)]
65    pub job_id: Uuid, // Validated UUID, not String
66}
67
68/// Workflow subscription tracking.
69#[derive(Debug, Clone)]
70pub struct WorkflowSubscription {
71    #[allow(dead_code)]
72    pub subscription_id: SubscriptionId,
73    pub session_id: SessionId,
74    pub client_sub_id: String,
75    #[allow(dead_code)]
76    pub workflow_id: Uuid, // Validated UUID, not String
77}
78
79/// ChangeListener -> InvalidationEngine -> Query Re-execution -> SSE Push
80pub struct Reactor {
81    node_id: NodeId,
82    db_pool: sqlx::PgPool,
83    registry: FunctionRegistry,
84    subscription_manager: Arc<SubscriptionManager>,
85    session_server: Arc<SessionServer>,
86    change_listener: Arc<ChangeListener>,
87    invalidation_engine: Arc<InvalidationEngine>,
88    /// Active subscriptions with their execution context.
89    active_subscriptions: Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
90    /// Job subscriptions: job_id -> list of subscribers.
91    job_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
92    /// Workflow subscriptions: workflow_id -> list of subscribers.
93    workflow_subscriptions: Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
94    /// Shutdown signal.
95    shutdown_tx: broadcast::Sender<()>,
96    /// Listener restart configuration
97    max_listener_restarts: u32,
98    listener_restart_delay_ms: u64,
99}
100
101impl Reactor {
102    /// Create a new reactor.
103    pub fn new(
104        node_id: NodeId,
105        db_pool: sqlx::PgPool,
106        registry: FunctionRegistry,
107        config: ReactorConfig,
108    ) -> Self {
109        let subscription_manager = Arc::new(SubscriptionManager::new(
110            config.realtime.max_subscriptions_per_session,
111        ));
112        let session_server = Arc::new(SessionServer::new(node_id, config.realtime.clone()));
113        let change_listener = Arc::new(ChangeListener::new(db_pool.clone(), config.listener));
114        let invalidation_engine = Arc::new(InvalidationEngine::new(
115            subscription_manager.clone(),
116            config.invalidation,
117        ));
118        let (shutdown_tx, _) = broadcast::channel(1);
119
120        Self {
121            node_id,
122            db_pool,
123            registry,
124            subscription_manager,
125            session_server,
126            change_listener,
127            invalidation_engine,
128            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
129            job_subscriptions: Arc::new(RwLock::new(HashMap::new())),
130            workflow_subscriptions: Arc::new(RwLock::new(HashMap::new())),
131            shutdown_tx,
132            max_listener_restarts: config.max_listener_restarts,
133            listener_restart_delay_ms: config.listener_restart_delay_ms,
134        }
135    }
136
137    /// Get the node ID.
138    pub fn node_id(&self) -> NodeId {
139        self.node_id
140    }
141
142    /// Get the session server reference.
143    pub fn session_server(&self) -> Arc<SessionServer> {
144        self.session_server.clone()
145    }
146
147    /// Get the subscription manager reference.
148    pub fn subscription_manager(&self) -> Arc<SubscriptionManager> {
149        self.subscription_manager.clone()
150    }
151
152    /// Get a shutdown receiver.
153    pub fn shutdown_receiver(&self) -> broadcast::Receiver<()> {
154        self.shutdown_tx.subscribe()
155    }
156
157    /// Register a new session.
158    pub async fn register_session(
159        &self,
160        session_id: SessionId,
161        sender: mpsc::Sender<RealtimeMessage>,
162    ) {
163        self.session_server
164            .register_connection(session_id, sender)
165            .await;
166        tracing::debug!(?session_id, "Session registered with reactor");
167    }
168
169    /// Remove a session and all its subscriptions.
170    pub async fn remove_session(&self, session_id: SessionId) {
171        if let Some(subscription_ids) = self.session_server.remove_connection(session_id).await {
172            // Clean up query subscriptions
173            for sub_id in subscription_ids {
174                self.subscription_manager.remove_subscription(sub_id).await;
175                self.active_subscriptions.write().await.remove(&sub_id);
176            }
177        }
178
179        // Clean up job subscriptions for this session
180        {
181            let mut job_subs = self.job_subscriptions.write().await;
182            for subscribers in job_subs.values_mut() {
183                subscribers.retain(|s| s.session_id != session_id);
184            }
185            // Remove empty entries
186            job_subs.retain(|_, v| !v.is_empty());
187        }
188
189        // Clean up workflow subscriptions for this session
190        {
191            let mut workflow_subs = self.workflow_subscriptions.write().await;
192            for subscribers in workflow_subs.values_mut() {
193                subscribers.retain(|s| s.session_id != session_id);
194            }
195            // Remove empty entries
196            workflow_subs.retain(|_, v| !v.is_empty());
197        }
198
199        tracing::debug!(?session_id, "Session removed from reactor");
200    }
201
202    /// Subscribe to a query.
203    pub async fn subscribe(
204        &self,
205        session_id: SessionId,
206        client_sub_id: String,
207        query_name: String,
208        args: serde_json::Value,
209        auth_context: forge_core::function::AuthContext,
210    ) -> forge_core::Result<(SubscriptionId, serde_json::Value)> {
211        let sub_info = self
212            .subscription_manager
213            .create_subscription(session_id, &query_name, args.clone())
214            .await?;
215
216        let subscription_id = sub_info.id;
217
218        self.session_server
219            .add_subscription(session_id, subscription_id)
220            .await?;
221
222        let (data, read_set) = self
223            .execute_query(&query_name, &args, &auth_context)
224            .await?;
225
226        let result_hash = Self::compute_hash(&data);
227
228        let tables: Vec<_> = read_set.tables.iter().collect();
229        tracing::debug!(
230            ?subscription_id,
231            query_name = %query_name,
232            read_set_tables = ?tables,
233            "Updating subscription with read set"
234        );
235
236        self.subscription_manager
237            .update_subscription(subscription_id, read_set.clone(), result_hash.clone())
238            .await;
239
240        let active = ActiveSubscription {
241            subscription_id,
242            session_id,
243            client_sub_id,
244            query_name,
245            args,
246            last_result_hash: Some(result_hash),
247            read_set,
248            auth_context,
249        };
250        self.active_subscriptions
251            .write()
252            .await
253            .insert(subscription_id, active);
254
255        tracing::debug!(?subscription_id, "Subscription created");
256
257        Ok((subscription_id, data))
258    }
259
260    /// Unsubscribe from a query.
261    pub async fn unsubscribe(&self, subscription_id: SubscriptionId) {
262        self.session_server
263            .remove_subscription(subscription_id)
264            .await;
265        self.subscription_manager
266            .remove_subscription(subscription_id)
267            .await;
268        self.active_subscriptions
269            .write()
270            .await
271            .remove(&subscription_id);
272        tracing::debug!(?subscription_id, "Subscription removed");
273    }
274
275    /// Subscribe to job progress updates.
276    pub async fn subscribe_job(
277        &self,
278        session_id: SessionId,
279        client_sub_id: String,
280        job_id: Uuid, // Pre-validated UUID
281    ) -> forge_core::Result<JobData> {
282        let subscription_id = SubscriptionId::new();
283
284        // Fetch current job state from database
285        let job_data = self.fetch_job_data(job_id).await?;
286
287        // Register subscription
288        let subscription = JobSubscription {
289            subscription_id,
290            session_id,
291            client_sub_id: client_sub_id.clone(),
292            job_id,
293        };
294
295        let mut subs = self.job_subscriptions.write().await;
296        subs.entry(job_id).or_default().push(subscription);
297
298        tracing::debug!(
299            ?subscription_id,
300            client_id = %client_sub_id,
301            %job_id,
302            "Job subscription created"
303        );
304
305        Ok(job_data)
306    }
307
308    /// Unsubscribe from job updates.
309    pub async fn unsubscribe_job(&self, session_id: SessionId, client_sub_id: &str) {
310        let mut subs = self.job_subscriptions.write().await;
311
312        // Find and remove the subscription
313        for subscribers in subs.values_mut() {
314            subscribers
315                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
316        }
317
318        // Remove empty entries
319        subs.retain(|_, v| !v.is_empty());
320
321        tracing::debug!(client_id = %client_sub_id, "Job subscription removed");
322    }
323
324    /// Subscribe to workflow progress updates.
325    pub async fn subscribe_workflow(
326        &self,
327        session_id: SessionId,
328        client_sub_id: String,
329        workflow_id: Uuid, // Pre-validated UUID
330    ) -> forge_core::Result<WorkflowData> {
331        let subscription_id = SubscriptionId::new();
332
333        // Fetch current workflow + steps from database
334        let workflow_data = self.fetch_workflow_data(workflow_id).await?;
335
336        // Register subscription
337        let subscription = WorkflowSubscription {
338            subscription_id,
339            session_id,
340            client_sub_id: client_sub_id.clone(),
341            workflow_id,
342        };
343
344        let mut subs = self.workflow_subscriptions.write().await;
345        subs.entry(workflow_id).or_default().push(subscription);
346
347        tracing::debug!(
348            ?subscription_id,
349            client_id = %client_sub_id,
350            %workflow_id,
351            "Workflow subscription created"
352        );
353
354        Ok(workflow_data)
355    }
356
357    /// Unsubscribe from workflow updates.
358    pub async fn unsubscribe_workflow(&self, session_id: SessionId, client_sub_id: &str) {
359        let mut subs = self.workflow_subscriptions.write().await;
360
361        // Find and remove the subscription
362        for subscribers in subs.values_mut() {
363            subscribers
364                .retain(|s| !(s.session_id == session_id && s.client_sub_id == client_sub_id));
365        }
366
367        // Remove empty entries
368        subs.retain(|_, v| !v.is_empty());
369
370        tracing::debug!(client_id = %client_sub_id, "Workflow subscription removed");
371    }
372
373    /// Fetch current job data from database.
374    #[allow(clippy::type_complexity)]
375    async fn fetch_job_data(&self, job_id: Uuid) -> forge_core::Result<JobData> {
376        let row: Option<(
377            String,
378            Option<i32>,
379            Option<String>,
380            Option<serde_json::Value>,
381            Option<String>,
382        )> = sqlx::query_as(
383            r#"
384                SELECT status, progress_percent, progress_message, output, last_error
385                FROM forge_jobs WHERE id = $1
386                "#,
387        )
388        .bind(job_id)
389        .fetch_optional(&self.db_pool)
390        .await
391        .map_err(forge_core::ForgeError::Sql)?;
392
393        match row {
394            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
395                job_id: job_id.to_string(),
396                status,
397                progress_percent,
398                progress_message,
399                output,
400                error,
401            }),
402            None => Err(forge_core::ForgeError::NotFound(format!(
403                "Job {} not found",
404                job_id
405            ))),
406        }
407    }
408
409    /// Fetch current workflow + steps from database.
410    #[allow(clippy::type_complexity)]
411    async fn fetch_workflow_data(&self, workflow_id: Uuid) -> forge_core::Result<WorkflowData> {
412        // Fetch workflow run
413        let row: Option<(
414            String,
415            Option<String>,
416            Option<serde_json::Value>,
417            Option<String>,
418        )> = sqlx::query_as(
419            r#"
420                SELECT status, current_step, output, error
421                FROM forge_workflow_runs WHERE id = $1
422                "#,
423        )
424        .bind(workflow_id)
425        .fetch_optional(&self.db_pool)
426        .await
427        .map_err(forge_core::ForgeError::Sql)?;
428
429        let (status, current_step, output, error) = match row {
430            Some(r) => r,
431            None => {
432                return Err(forge_core::ForgeError::NotFound(format!(
433                    "Workflow {} not found",
434                    workflow_id
435                )));
436            }
437        };
438
439        // Fetch workflow steps
440        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
441            r#"
442            SELECT step_name, status, error
443            FROM forge_workflow_steps
444            WHERE workflow_run_id = $1
445            ORDER BY started_at ASC NULLS LAST
446            "#,
447        )
448        .bind(workflow_id)
449        .fetch_all(&self.db_pool)
450        .await
451        .map_err(forge_core::ForgeError::Sql)?;
452
453        let steps = step_rows
454            .into_iter()
455            .map(|(name, status, error)| WorkflowStepData {
456                name,
457                status,
458                error,
459            })
460            .collect();
461
462        Ok(WorkflowData {
463            workflow_id: workflow_id.to_string(),
464            status,
465            current_step,
466            steps,
467            output,
468            error,
469        })
470    }
471
472    /// Execute a query and return data with read set.
473    async fn execute_query(
474        &self,
475        query_name: &str,
476        args: &serde_json::Value,
477        auth_context: &forge_core::function::AuthContext,
478    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
479        match self.registry.get(query_name) {
480            Some(FunctionEntry::Query { info, handler }) => {
481                let ctx = forge_core::function::QueryContext::new(
482                    self.db_pool.clone(),
483                    auth_context.clone(),
484                    forge_core::function::RequestMetadata::new(),
485                );
486
487                // Normalize args
488                let normalized_args = match args {
489                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
490                        serde_json::Value::Null
491                    }
492                    v => v.clone(),
493                };
494
495                let data = handler(&ctx, normalized_args).await?;
496
497                // Create read set from compile-time extracted table dependencies
498                let mut read_set = ReadSet::new();
499
500                if info.table_dependencies.is_empty() {
501                    // Fallback: no tables extracted (dynamic SQL)
502                    // Use naming convention as last resort
503                    let table_name = Self::extract_table_name(query_name);
504                    read_set.add_table(&table_name);
505                    tracing::debug!(
506                        query = %query_name,
507                        fallback_table = %table_name,
508                        "No compile-time table dependencies found, using naming convention fallback"
509                    );
510                } else {
511                    // Use compile-time extracted tables
512                    for table in info.table_dependencies {
513                        read_set.add_table(*table);
514                    }
515                    tracing::debug!(
516                        query = %query_name,
517                        tables = ?info.table_dependencies,
518                        "Using compile-time table dependencies"
519                    );
520                }
521
522                Ok((data, read_set))
523            }
524            Some(_) => Err(forge_core::ForgeError::Validation(format!(
525                "'{}' is not a query",
526                query_name
527            ))),
528            None => Err(forge_core::ForgeError::Validation(format!(
529                "Query '{}' not found",
530                query_name
531            ))),
532        }
533    }
534
535    /// Compute a hash of the result for delta detection.
536    fn compute_hash(data: &serde_json::Value) -> String {
537        use std::collections::hash_map::DefaultHasher;
538        use std::hash::{Hash, Hasher};
539
540        let json = serde_json::to_string(data).unwrap_or_default();
541        let mut hasher = DefaultHasher::new();
542        json.hash(&mut hasher);
543        format!("{:x}", hasher.finish())
544    }
545
546    /// Start the reactor (runs the change listener and invalidation loop).
547    pub async fn start(&self) -> forge_core::Result<()> {
548        let listener = self.change_listener.clone();
549        let invalidation_engine = self.invalidation_engine.clone();
550        let active_subscriptions = self.active_subscriptions.clone();
551        let job_subscriptions = self.job_subscriptions.clone();
552        let workflow_subscriptions = self.workflow_subscriptions.clone();
553        let session_server = self.session_server.clone();
554        let registry = self.registry.clone();
555        let db_pool = self.db_pool.clone();
556        let mut shutdown_rx = self.shutdown_tx.subscribe();
557        let max_restarts = self.max_listener_restarts;
558        let base_delay_ms = self.listener_restart_delay_ms;
559
560        // Subscribe to changes
561        let mut change_rx = listener.subscribe();
562
563        // Main reactor loop
564        tokio::spawn(async move {
565            tracing::info!("Reactor started, listening for changes");
566
567            let mut restart_count: u32 = 0;
568            let (listener_error_tx, mut listener_error_rx) = mpsc::channel::<String>(1);
569
570            // Start initial listener
571            let listener_clone = listener.clone();
572            let error_tx = listener_error_tx.clone();
573            let mut listener_handle = Some(tokio::spawn(async move {
574                if let Err(e) = listener_clone.run().await {
575                    let _ = error_tx.send(format!("Change listener error: {}", e)).await;
576                }
577            }));
578
579            loop {
580                tokio::select! {
581                    result = change_rx.recv() => {
582                        match result {
583                            Ok(change) => {
584                                Self::handle_change(
585                                    &change,
586                                    &invalidation_engine,
587                                    &active_subscriptions,
588                                    &job_subscriptions,
589                                    &workflow_subscriptions,
590                                    &session_server,
591                                    &registry,
592                                    &db_pool,
593                                ).await;
594                            }
595                            Err(broadcast::error::RecvError::Lagged(n)) => {
596                                tracing::warn!("Reactor lagged by {} messages", n);
597                            }
598                            Err(broadcast::error::RecvError::Closed) => {
599                                tracing::info!("Change channel closed");
600                                break;
601                            }
602                        }
603                    }
604                    Some(error_msg) = listener_error_rx.recv() => {
605                        tracing::error!("Listener failed: {}", error_msg);
606
607                        if restart_count >= max_restarts {
608                            tracing::error!(
609                                "Listener failed {} times, giving up. Real-time updates disabled.",
610                                restart_count
611                            );
612                            break;
613                        }
614
615                        restart_count += 1;
616                        let delay = base_delay_ms * 2u64.saturating_pow(restart_count - 1);
617                        tracing::warn!(
618                            "Restarting listener in {}ms (attempt {}/{})",
619                            delay, restart_count, max_restarts
620                        );
621
622                        tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
623
624                        // Restart listener
625                        let listener_clone = listener.clone();
626                        let error_tx = listener_error_tx.clone();
627                        if let Some(handle) = listener_handle.take() {
628                            handle.abort();
629                        }
630                        change_rx = listener.subscribe();
631                        listener_handle = Some(tokio::spawn(async move {
632                            if let Err(e) = listener_clone.run().await {
633                                let _ = error_tx.send(format!("Change listener error: {}", e)).await;
634                            }
635                        }));
636                    }
637                    _ = shutdown_rx.recv() => {
638                        tracing::info!("Reactor shutting down");
639                        break;
640                    }
641                }
642            }
643
644            if let Some(handle) = listener_handle {
645                handle.abort();
646            }
647        });
648
649        Ok(())
650    }
651
652    /// Handle a database change event.
653    #[allow(clippy::too_many_arguments)]
654    async fn handle_change(
655        change: &Change,
656        invalidation_engine: &Arc<InvalidationEngine>,
657        active_subscriptions: &Arc<RwLock<HashMap<SubscriptionId, ActiveSubscription>>>,
658        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
659        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
660        session_server: &Arc<SessionServer>,
661        registry: &FunctionRegistry,
662        db_pool: &sqlx::PgPool,
663    ) {
664        tracing::debug!(table = %change.table, op = ?change.operation, row_id = ?change.row_id, "Processing change");
665
666        // Handle job/workflow table changes first
667        match change.table.as_str() {
668            "forge_jobs" => {
669                if let Some(job_id) = change.row_id {
670                    Self::handle_job_change(job_id, job_subscriptions, session_server, db_pool)
671                        .await;
672                }
673                return; // Don't process through query invalidation
674            }
675            "forge_workflow_runs" => {
676                if let Some(workflow_id) = change.row_id {
677                    Self::handle_workflow_change(
678                        workflow_id,
679                        workflow_subscriptions,
680                        session_server,
681                        db_pool,
682                    )
683                    .await;
684                }
685                return; // Don't process through query invalidation
686            }
687            "forge_workflow_steps" => {
688                // For step changes, need to look up the parent workflow_id
689                if let Some(step_id) = change.row_id {
690                    Self::handle_workflow_step_change(
691                        step_id,
692                        workflow_subscriptions,
693                        session_server,
694                        db_pool,
695                    )
696                    .await;
697                }
698                return; // Don't process through query invalidation
699            }
700            _ => {}
701        }
702
703        // Process change through invalidation engine for query subscriptions
704        invalidation_engine.process_change(change.clone()).await;
705
706        // Flush all pending invalidations immediately for real-time updates
707        // Note: A more sophisticated approach would use the invalidation engine's run loop
708        // with proper debouncing for high-frequency changes
709        let invalidated = invalidation_engine.flush_all().await;
710
711        if invalidated.is_empty() {
712            return;
713        }
714
715        tracing::debug!(count = invalidated.len(), "Invalidating subscriptions");
716
717        // Collect subscription info under read lock, then release before async operations
718        let subs_to_process: Vec<_> = {
719            let subscriptions = active_subscriptions.read().await;
720            invalidated
721                .iter()
722                .filter_map(|sub_id| {
723                    subscriptions.get(sub_id).map(|active| {
724                        (
725                            *sub_id,
726                            active.session_id,
727                            active.client_sub_id.clone(),
728                            active.query_name.clone(),
729                            active.args.clone(),
730                            active.last_result_hash.clone(),
731                            active.auth_context.clone(),
732                        )
733                    })
734                })
735                .collect()
736        };
737
738        // Track updates to apply after processing
739        let mut updates: Vec<(SubscriptionId, String)> = Vec::new();
740
741        // Re-execute invalidated queries and push updates (without holding locks)
742        for (sub_id, session_id, client_sub_id, query_name, args, last_hash, auth_context) in
743            subs_to_process
744        {
745            // Re-execute the query
746            match Self::execute_query_static(registry, db_pool, &query_name, &args, &auth_context)
747                .await
748            {
749                Ok((new_data, _read_set)) => {
750                    let new_hash = Self::compute_hash(&new_data);
751
752                    // Only push if data changed
753                    if last_hash.as_ref() != Some(&new_hash) {
754                        // Send updated data to client using client_sub_id for SSE target matching
755                        let message = RealtimeMessage::Data {
756                            subscription_id: client_sub_id.clone(),
757                            data: new_data,
758                        };
759
760                        if let Err(e) = session_server.send_to_session(session_id, message).await {
761                            tracing::warn!(client_id = %client_sub_id, "Failed to send update: {}", e);
762                        } else {
763                            tracing::debug!(client_id = %client_sub_id, "Pushed update to client");
764                            // Track the hash update
765                            updates.push((sub_id, new_hash));
766                        }
767                    }
768                }
769                Err(e) => {
770                    tracing::error!(client_id = %client_sub_id, "Failed to re-execute query: {}", e);
771                }
772            }
773        }
774
775        // Update hashes for successfully sent updates
776        if !updates.is_empty() {
777            let mut subscriptions = active_subscriptions.write().await;
778            for (sub_id, new_hash) in updates {
779                if let Some(active) = subscriptions.get_mut(&sub_id) {
780                    active.last_result_hash = Some(new_hash);
781                }
782            }
783        }
784    }
785
786    /// Handle a job table change event.
787    async fn handle_job_change(
788        job_id: Uuid,
789        job_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<JobSubscription>>>>,
790        session_server: &Arc<SessionServer>,
791        db_pool: &sqlx::PgPool,
792    ) {
793        let subs = job_subscriptions.read().await;
794        let subscribers = match subs.get(&job_id) {
795            Some(s) if !s.is_empty() => s.clone(),
796            _ => return, // No subscribers for this job
797        };
798        drop(subs); // Release lock before async operations
799
800        // Fetch latest job state
801        let job_data = match Self::fetch_job_data_static(job_id, db_pool).await {
802            Ok(data) => data,
803            Err(e) => {
804                tracing::warn!(%job_id, "Failed to fetch job data: {}", e);
805                return;
806            }
807        };
808
809        // Push to all subscribers
810        for sub in subscribers {
811            let message = RealtimeMessage::JobUpdate {
812                client_sub_id: sub.client_sub_id.clone(),
813                job: job_data.clone(),
814            };
815
816            if let Err(e) = session_server
817                .send_to_session(sub.session_id, message)
818                .await
819            {
820                // Debug level because this commonly happens when session disconnects (page refresh)
821                tracing::debug!(
822                    %job_id,
823                    client_id = %sub.client_sub_id,
824                    "Failed to send job update (session likely disconnected): {}",
825                    e
826                );
827            } else {
828                tracing::debug!(
829                    %job_id,
830                    client_id = %sub.client_sub_id,
831                    "Pushed job update to client"
832                );
833            }
834        }
835    }
836
837    /// Handle a workflow table change event.
838    async fn handle_workflow_change(
839        workflow_id: Uuid,
840        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
841        session_server: &Arc<SessionServer>,
842        db_pool: &sqlx::PgPool,
843    ) {
844        let subs = workflow_subscriptions.read().await;
845        let subscribers = match subs.get(&workflow_id) {
846            Some(s) if !s.is_empty() => s.clone(),
847            _ => return, // No subscribers for this workflow
848        };
849        drop(subs); // Release lock before async operations
850
851        // Fetch latest workflow + steps state
852        let workflow_data = match Self::fetch_workflow_data_static(workflow_id, db_pool).await {
853            Ok(data) => data,
854            Err(e) => {
855                tracing::warn!(%workflow_id, "Failed to fetch workflow data: {}", e);
856                return;
857            }
858        };
859
860        // Push to all subscribers
861        for sub in subscribers {
862            let message = RealtimeMessage::WorkflowUpdate {
863                client_sub_id: sub.client_sub_id.clone(),
864                workflow: workflow_data.clone(),
865            };
866
867            if let Err(e) = session_server
868                .send_to_session(sub.session_id, message)
869                .await
870            {
871                // Debug level because this commonly happens when session disconnects (page refresh)
872                tracing::debug!(
873                    %workflow_id,
874                    client_id = %sub.client_sub_id,
875                    "Failed to send workflow update (session likely disconnected): {}",
876                    e
877                );
878            } else {
879                tracing::debug!(
880                    %workflow_id,
881                    client_id = %sub.client_sub_id,
882                    "Pushed workflow update to client"
883                );
884            }
885        }
886    }
887
888    /// Handle a workflow step change event.
889    async fn handle_workflow_step_change(
890        step_id: Uuid,
891        workflow_subscriptions: &Arc<RwLock<HashMap<Uuid, Vec<WorkflowSubscription>>>>,
892        session_server: &Arc<SessionServer>,
893        db_pool: &sqlx::PgPool,
894    ) {
895        // Look up the workflow_run_id for this step
896        let workflow_id: Option<Uuid> = match sqlx::query_scalar(
897            "SELECT workflow_run_id FROM forge_workflow_steps WHERE id = $1",
898        )
899        .bind(step_id)
900        .fetch_optional(db_pool)
901        .await
902        {
903            Ok(id) => id,
904            Err(e) => {
905                tracing::warn!(%step_id, "Failed to look up workflow_run_id for step: {}", e);
906                return;
907            }
908        };
909
910        if let Some(wf_id) = workflow_id {
911            // Delegate to workflow change handler
912            Self::handle_workflow_change(wf_id, workflow_subscriptions, session_server, db_pool)
913                .await;
914        }
915    }
916
917    /// Static version of fetch_job_data for use in handle_change.
918    #[allow(clippy::type_complexity)]
919    async fn fetch_job_data_static(
920        job_id: Uuid,
921        db_pool: &sqlx::PgPool,
922    ) -> forge_core::Result<JobData> {
923        let row: Option<(
924            String,
925            Option<i32>,
926            Option<String>,
927            Option<serde_json::Value>,
928            Option<String>,
929        )> = sqlx::query_as(
930            r#"
931                SELECT status, progress_percent, progress_message, output, last_error
932                FROM forge_jobs WHERE id = $1
933                "#,
934        )
935        .bind(job_id)
936        .fetch_optional(db_pool)
937        .await
938        .map_err(forge_core::ForgeError::Sql)?;
939
940        match row {
941            Some((status, progress_percent, progress_message, output, error)) => Ok(JobData {
942                job_id: job_id.to_string(),
943                status,
944                progress_percent,
945                progress_message,
946                output,
947                error,
948            }),
949            None => Err(forge_core::ForgeError::NotFound(format!(
950                "Job {} not found",
951                job_id
952            ))),
953        }
954    }
955
956    /// Static version of fetch_workflow_data for use in handle_change.
957    #[allow(clippy::type_complexity)]
958    async fn fetch_workflow_data_static(
959        workflow_id: Uuid,
960        db_pool: &sqlx::PgPool,
961    ) -> forge_core::Result<WorkflowData> {
962        let row: Option<(
963            String,
964            Option<String>,
965            Option<serde_json::Value>,
966            Option<String>,
967        )> = sqlx::query_as(
968            r#"
969                SELECT status, current_step, output, error
970                FROM forge_workflow_runs WHERE id = $1
971                "#,
972        )
973        .bind(workflow_id)
974        .fetch_optional(db_pool)
975        .await
976        .map_err(forge_core::ForgeError::Sql)?;
977
978        let (status, current_step, output, error) = match row {
979            Some(r) => r,
980            None => {
981                return Err(forge_core::ForgeError::NotFound(format!(
982                    "Workflow {} not found",
983                    workflow_id
984                )));
985            }
986        };
987
988        let step_rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
989            r#"
990            SELECT step_name, status, error
991            FROM forge_workflow_steps
992            WHERE workflow_run_id = $1
993            ORDER BY started_at ASC NULLS LAST
994            "#,
995        )
996        .bind(workflow_id)
997        .fetch_all(db_pool)
998        .await
999        .map_err(forge_core::ForgeError::Sql)?;
1000
1001        let steps = step_rows
1002            .into_iter()
1003            .map(|(name, status, error)| WorkflowStepData {
1004                name,
1005                status,
1006                error,
1007            })
1008            .collect();
1009
1010        Ok(WorkflowData {
1011            workflow_id: workflow_id.to_string(),
1012            status,
1013            current_step,
1014            steps,
1015            output,
1016            error,
1017        })
1018    }
1019
1020    /// Static version of execute_query for use in async context.
1021    async fn execute_query_static(
1022        registry: &FunctionRegistry,
1023        db_pool: &sqlx::PgPool,
1024        query_name: &str,
1025        args: &serde_json::Value,
1026        auth_context: &forge_core::function::AuthContext,
1027    ) -> forge_core::Result<(serde_json::Value, ReadSet)> {
1028        match registry.get(query_name) {
1029            Some(FunctionEntry::Query { info, handler }) => {
1030                let ctx = forge_core::function::QueryContext::new(
1031                    db_pool.clone(),
1032                    auth_context.clone(),
1033                    forge_core::function::RequestMetadata::new(),
1034                );
1035
1036                let normalized_args = match args {
1037                    v if v.is_object() && v.as_object().unwrap().is_empty() => {
1038                        serde_json::Value::Null
1039                    }
1040                    v => v.clone(),
1041                };
1042
1043                let data = handler(&ctx, normalized_args).await?;
1044
1045                // Create read set from compile-time extracted table dependencies
1046                let mut read_set = ReadSet::new();
1047
1048                if info.table_dependencies.is_empty() {
1049                    // Fallback for dynamic SQL
1050                    let table_name = Self::extract_table_name(query_name);
1051                    read_set.add_table(&table_name);
1052                    tracing::debug!(
1053                        query = %query_name,
1054                        fallback_table = %table_name,
1055                        "No compile-time table dependencies found (static), using naming convention fallback"
1056                    );
1057                } else {
1058                    for table in info.table_dependencies {
1059                        read_set.add_table(*table);
1060                    }
1061                    tracing::debug!(
1062                        query = %query_name,
1063                        tables = ?info.table_dependencies,
1064                        "Using compile-time table dependencies (static)"
1065                    );
1066                }
1067
1068                Ok((data, read_set))
1069            }
1070            _ => Err(forge_core::ForgeError::Validation(format!(
1071                "Query '{}' not found or not a query",
1072                query_name
1073            ))),
1074        }
1075    }
1076
1077    /// Extract table name from query name using common patterns.
1078    fn extract_table_name(query_name: &str) -> String {
1079        if let Some(rest) = query_name.strip_prefix("get_") {
1080            rest.to_string()
1081        } else if let Some(rest) = query_name.strip_prefix("list_") {
1082            rest.to_string()
1083        } else if let Some(rest) = query_name.strip_prefix("find_") {
1084            rest.to_string()
1085        } else if let Some(rest) = query_name.strip_prefix("fetch_") {
1086            rest.to_string()
1087        } else {
1088            query_name.to_string()
1089        }
1090    }
1091
1092    /// Stop the reactor.
1093    pub fn stop(&self) {
1094        let _ = self.shutdown_tx.send(());
1095        self.change_listener.stop();
1096    }
1097
1098    /// Get reactor statistics.
1099    pub async fn stats(&self) -> ReactorStats {
1100        let session_stats = self.session_server.stats().await;
1101        let inv_stats = self.invalidation_engine.stats().await;
1102
1103        ReactorStats {
1104            connections: session_stats.connections,
1105            subscriptions: session_stats.subscriptions,
1106            pending_invalidations: inv_stats.pending_subscriptions,
1107            listener_running: self.change_listener.is_running(),
1108        }
1109    }
1110}
1111
1112/// Reactor statistics.
1113#[derive(Debug, Clone)]
1114pub struct ReactorStats {
1115    pub connections: usize,
1116    pub subscriptions: usize,
1117    pub pending_invalidations: usize,
1118    pub listener_running: bool,
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123    use super::*;
1124
1125    #[test]
1126    fn test_reactor_config_default() {
1127        let config = ReactorConfig::default();
1128        assert_eq!(config.listener.channel, "forge_changes");
1129        assert_eq!(config.invalidation.debounce_ms, 50);
1130        assert_eq!(config.max_listener_restarts, 5);
1131        assert_eq!(config.listener_restart_delay_ms, 1000);
1132    }
1133
1134    #[test]
1135    fn test_compute_hash() {
1136        let data1 = serde_json::json!({"name": "test"});
1137        let data2 = serde_json::json!({"name": "test"});
1138        let data3 = serde_json::json!({"name": "different"});
1139
1140        let hash1 = Reactor::compute_hash(&data1);
1141        let hash2 = Reactor::compute_hash(&data2);
1142        let hash3 = Reactor::compute_hash(&data3);
1143
1144        assert_eq!(hash1, hash2);
1145        assert_ne!(hash1, hash3);
1146    }
1147}