Skip to main content

seesaw_core/
job_executor.rs

1//! JobExecutor - extracted handler execution logic from workers
2//!
3//! This consolidates the event processing and handler execution logic
4//! that was previously embedded in EventWorker and HandlerWorker.
5
6use anyhow::Result;
7use std::any::{Any, TypeId};
8use std::sync::Arc;
9use tokio::time::timeout;
10use tracing::{info, warn};
11use uuid::Uuid;
12
13use crate::aggregator::AggregatorRegistry;
14use crate::handler::{Context, DlqTerminalInfo, EventOutput, GlobalDlqMapper, Handler};
15use crate::handler_queue::HandlerQueue;
16use crate::handler_registry::HandlerRegistry;
17use crate::types::{
18    EmittedEvent, EventWorkerConfig, HandlerIntent, HandlerWorkerConfig,
19    IntentCommit, PersistedEvent, ProjectionFailure, QueuedHandler, NAMESPACE_SEESAW,
20};
21use crate::upcaster::UpcasterRegistry;
22
23/// Extracted execution logic for events and handlers.
24///
25/// This struct consolidates the pure execution logic that was previously
26/// scattered across EventWorker and HandlerWorker implementations.
27pub struct JobExecutor<D>
28where
29    D: Send + Sync + 'static,
30{
31    deps: Arc<D>,
32    queue: Arc<dyn HandlerQueue>,
33    handlers: Arc<HandlerRegistry<D>>,
34    aggregator_registry: Arc<AggregatorRegistry>,
35    upcasters: Arc<UpcasterRegistry>,
36    global_dlq_mapper: Option<GlobalDlqMapper>,
37}
38
39impl<D> JobExecutor<D>
40where
41    D: Send + Sync + 'static,
42{
43    /// Create a new job executor.
44    pub fn new(
45        deps: Arc<D>,
46        queue: Arc<dyn HandlerQueue>,
47        handlers: Arc<HandlerRegistry<D>>,
48        aggregator_registry: Arc<AggregatorRegistry>,
49        upcasters: Arc<UpcasterRegistry>,
50        global_dlq_mapper: Option<GlobalDlqMapper>,
51    ) -> Self {
52        Self {
53            deps,
54            queue,
55            handlers,
56            aggregator_registry,
57            upcasters,
58            global_dlq_mapper,
59        }
60    }
61
62    /// Process a persisted event: create handler intents and run projections.
63    ///
64    /// All matching handlers become queued handler intents. Only projections
65    /// (observers) run inline during event processing.
66    ///
67    /// Returns an [`IntentCommit`] that the caller enqueues via `HandlerQueue::enqueue`.
68    /// Process an event: decode, route to handlers, build intents, run projections.
69    ///
70    /// When `skip_projections` is true, projections are not executed. This is
71    /// used for ephemeral events which route through handlers but skip
72    /// persistence, aggregators, and projections.
73    pub async fn process_event(
74        &self,
75        event: &PersistedEvent,
76        _config: &EventWorkerConfig,
77    ) -> Result<IntentCommit> {
78        self.process_event_inner(event, _config, false).await
79    }
80
81    pub async fn process_event_inner(
82        &self,
83        event: &PersistedEvent,
84        _config: &EventWorkerConfig,
85        skip_projections: bool,
86    ) -> Result<IntentCommit> {
87        info!(
88            "Processing event: type={}, correlation={}, position={}",
89            event.event_type, event.correlation_id, event.position
90        );
91
92        // 1. Decode event via codec (prefer ephemeral sidecar if present)
93        let (typed_event, event_type_id) = self.decode_event(&event.event_type, &event.payload, event.ephemeral.as_ref())?;
94
95        // 2. Route to matching handlers
96        let matching_handlers: Vec<_> = self
97            .handlers
98            .all()
99            .into_iter()
100            .filter(|h| h.can_handle(event_type_id))
101            .collect();
102
103        // 3. Call describe() on ALL handlers that have it (not just matching ones).
104        //
105        // Every event in a correlation may update aggregate state, so we
106        // re-run describe for every handler with a describe closure. This
107        // ensures that when handler A emits EventB (which updates aggregates),
108        // handler A's description is refreshed when EventB is processed —
109        // even though handler A doesn't match EventB.
110        let mut handler_descriptions = std::collections::HashMap::new();
111        for handler in self.handlers.all() {
112            if handler.has_describe() {
113                let ctx = self.make_context(
114                    handler.id.clone(),
115                    format!("describe::{}", handler.id),
116                    event.correlation_id,
117                    event.event_id,
118                    event.parent_id,
119                );
120                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
121                    handler.call_describe(&ctx)
122                })) {
123                    Ok(Some(value)) => {
124                        handler_descriptions.insert(handler.id.clone(), value);
125                    }
126                    Ok(None) => {}
127                    Err(_) => {
128                        tracing::warn!(
129                            handler_id = %handler.id,
130                            "describe() panicked, skipping"
131                        );
132                    }
133                }
134            }
135        }
136
137        // 4. Create queued handler intents for ALL matching handlers
138        let hops = event.metadata.get("_hops")
139            .and_then(|v| v.as_u64())
140            .unwrap_or(0) as i32;
141        let mut handler_intents = Vec::new();
142        for handler in &matching_handlers {
143            let execute_at = match handler.delay {
144                Some(delay) => {
145                    chrono::Utc::now()
146                        + chrono::Duration::from_std(delay)
147                            .map_err(|_| anyhow::anyhow!("invalid handler delay"))?
148                }
149                None => chrono::Utc::now(),
150            };
151            let timeout_seconds = handler
152                .timeout
153                .map(|d| d.as_secs() as i32)
154                .unwrap_or(900)
155                .max(1);
156            handler_intents.push(HandlerIntent {
157                handler_id: handler.id.clone(),
158                parent_event_id: event.parent_id,
159                execute_at,
160                timeout_seconds,
161                max_attempts: handler.max_attempts as i32,
162                priority: handler.priority.unwrap_or(10),
163                hops,
164            });
165        }
166
167        // 5. Execute projections sequentially (projections are observers, not handlers)
168        //    Skipped for ephemeral events.
169        let mut projection_failures = Vec::new();
170
171        let projections = if skip_projections { Vec::new() } else { self.handlers.projections() };
172        for projection in &projections {
173            let any_event = crate::handler::AnyEvent {
174                value: typed_event.clone(),
175                type_id: event_type_id,
176            };
177            let idempotency_key = Uuid::new_v5(
178                &NAMESPACE_SEESAW,
179                format!("{}-{}", event.event_id, projection.id).as_bytes(),
180            )
181            .to_string();
182            let ctx = self.make_context(
183                projection.id.clone(),
184                idempotency_key,
185                event.correlation_id,
186                event.event_id,
187                event.parent_id,
188            );
189
190            if let Err(error) = (projection.handler)(any_event, ctx).await {
191                let error_string = error.to_string();
192                warn!(
193                    "Projection handler failed: event_id={}, projection_id={}, error={}",
194                    event.event_id, projection.id, error_string
195                );
196                projection_failures.push(ProjectionFailure {
197                    handler_id: projection.id.clone(),
198                    error: error_string,
199                    reason: "projection_failed".to_string(),
200                    attempts: 1,
201                });
202            }
203        }
204
205        // 6. Return IntentCommit
206        Ok(IntentCommit {
207            event_id: event.event_id,
208            correlation_id: event.correlation_id,
209            event_type: event.event_type.clone(),
210            event_payload: event.payload.clone(),
211            checkpoint: event.position,
212            intents: handler_intents,
213            projection_failures,
214            handler_descriptions,
215            park: None,
216        })
217    }
218
219    /// Execute a queued handler.
220    pub async fn execute_handler(
221        &self,
222        execution: QueuedHandler,
223        config: &HandlerWorkerConfig,
224    ) -> Result<HandlerResult> {
225        info!(
226            "Processing handler: handler_id={}, workflow={}, priority={}, attempt={}/{}",
227            execution.handler_id,
228            execution.correlation_id,
229            execution.priority,
230            execution.attempts,
231            execution.max_attempts
232        );
233
234        // 1. Find handler by ID
235        let Some(handler) = self.handlers.find_by_id(&execution.handler_id) else {
236            let error = format!(
237                "No handler registered for id '{}'",
238                execution.handler_id
239            );
240            warn!("{}", error);
241            return Ok(HandlerResult {
242                status: if execution.attempts >= execution.max_attempts {
243                    HandlerStatus::Failed {
244                        error: error.clone(),
245                        attempts: execution.attempts,
246                    }
247                } else {
248                    HandlerStatus::Retry {
249                        error,
250                        attempts: execution.attempts,
251                    }
252                },
253                emitted_events: Vec::new(),
254                result: serde_json::json!({}),
255
256                log_entries: Vec::new(),
257            });
258        };
259
260        let (typed_event, type_id) =
261            self.decode_event(&execution.event_type, &execution.event_payload, execution.ephemeral.as_ref())?;
262
263        let idempotency_key = Uuid::new_v5(
264            &NAMESPACE_SEESAW,
265            format!("{}-{}", execution.event_id, execution.handler_id).as_bytes(),
266        )
267        .to_string();
268
269        let journal_entries = self
270            .queue
271            .load_journal(&handler.id, execution.event_id)
272            .await?;
273
274        let ctx = self
275            .make_context(
276                handler.id.clone(),
277                idempotency_key,
278                execution.correlation_id,
279                execution.event_id,
280                execution.parent_event_id,
281            )
282            .with_journal(self.queue.clone(), journal_entries);
283
284        // 2. Execute with timeout
285        let timeout_duration = if execution.timeout_seconds > 0 {
286            std::time::Duration::from_secs(execution.timeout_seconds as u64)
287        } else {
288            config.default_timeout
289        };
290
291        let handler_fut = handler.make_handler_future(typed_event.clone(), type_id, ctx.clone());
292        let result = timeout(timeout_duration, handler_fut)
293        .await;
294
295        // 4. Handle execution result
296        match result {
297            Ok(Ok(emitted_raw)) => {
298                // 5. Serialize emitted events
299                let emitted_events = self.serialize_emitted_events(
300                    emitted_raw,
301                    &execution,
302                )?;
303
304                info!("Handler completed successfully: {}", execution.handler_id);
305                Ok(HandlerResult {
306                    status: HandlerStatus::Success,
307                    emitted_events,
308                    result: serde_json::json!({ "status": "ok" }),
309    
310                    log_entries: ctx.logger.drain(),
311                })
312            }
313            Ok(Err(e)) => {
314                warn!(
315                    "Handler failed: {} (attempt {}/{}): {}",
316                    execution.handler_id, execution.attempts, execution.max_attempts, e
317                );
318
319                let status = if execution.attempts >= execution.max_attempts {
320                    HandlerStatus::Failed {
321                        error: e.to_string(),
322                        attempts: execution.attempts,
323                    }
324                } else {
325                    HandlerStatus::Retry {
326                        error: e.to_string(),
327                        attempts: execution.attempts,
328                    }
329                };
330
331                // Try to build DLQ terminal event
332                let emitted_events =
333                    if execution.attempts >= execution.max_attempts
334                        && (handler.dlq_terminal_mapper.is_some() || self.global_dlq_mapper.is_some())
335                    {
336                        self.build_dlq_terminal_event(
337                            &handler,
338                            typed_event,
339                            type_id,
340                            &execution,
341                            "failed",
342                            e.to_string(),
343                        )?
344                    } else {
345                        Vec::new()
346                    };
347
348                Ok(HandlerResult {
349                    status,
350                    emitted_events,
351                    result: serde_json::json!({}),
352    
353                    log_entries: ctx.logger.drain(),
354                })
355            }
356            Err(_) => {
357                warn!("Handler timed out: {}", execution.handler_id);
358
359                let timeout_error = "Handler execution timed out".to_string();
360
361                let status = if execution.attempts >= execution.max_attempts {
362                    HandlerStatus::Failed {
363                        error: timeout_error.clone(),
364                        attempts: execution.attempts,
365                    }
366                } else {
367                    HandlerStatus::Retry {
368                        error: timeout_error.clone(),
369                        attempts: execution.attempts,
370                    }
371                };
372
373                // Try to build DLQ terminal event for timeout
374                let emitted_events = if execution.attempts >= execution.max_attempts
375                    && (handler.dlq_terminal_mapper.is_some() || self.global_dlq_mapper.is_some())
376                {
377                    self.build_dlq_terminal_event(
378                        &handler,
379                        typed_event,
380                        type_id,
381                        &execution,
382                        "timeout",
383                        timeout_error,
384                    )?
385                } else {
386                    Vec::new()
387                };
388
389                Ok(HandlerResult {
390                    status,
391                    emitted_events,
392                    result: serde_json::json!({}),
393    
394                    log_entries: ctx.logger.drain(),
395                })
396            }
397        }
398    }
399
400    /// Run startup handlers.
401    pub async fn run_startup_handlers(&self) -> Result<()> {
402        for h in self.handlers.all() {
403            if h.started.is_none() {
404                continue;
405            }
406
407            let ctx = self.make_context(
408                h.id.clone(),
409                format!("startup::{}", h.id),
410                Uuid::nil(),
411                Uuid::nil(),
412                None,
413            );
414
415            h.call_started(ctx)
416                .await
417                .map_err(|e| anyhow::anyhow!("startup handler '{}' failed: {}", h.id, e))?;
418        }
419        Ok(())
420    }
421
422    /// Get handler registry reference.
423    pub fn handler_registry(&self) -> &Arc<HandlerRegistry<D>> {
424        &self.handlers
425    }
426
427    // --- Private helpers ---
428
429    fn make_context(
430        &self,
431        handler_id: String,
432        idempotency_key: String,
433        correlation_id: Uuid,
434        event_id: Uuid,
435        parent_event_id: Option<Uuid>,
436    ) -> Context<D> {
437        Context::new(
438            handler_id,
439            idempotency_key,
440            correlation_id,
441            event_id,
442            parent_event_id,
443            self.deps.clone(),
444        )
445        .with_aggregator_registry(self.aggregator_registry.clone())
446    }
447
448    fn decode_event(
449        &self,
450        event_type: &str,
451        payload: &serde_json::Value,
452        ephemeral: Option<&Arc<dyn Any + Send + Sync>>,
453    ) -> Result<(Arc<dyn Any + Send + Sync>, TypeId)> {
454        // Fast path: if the ephemeral sidecar is present and a codec is registered,
455        // use the original typed event directly (preserves #[serde(skip)] fields).
456        // Skip when upcasters exist — the ephemeral holds the pre-upcasted shape.
457        if let Some(typed) = ephemeral {
458            if self.upcasters.is_empty() {
459                if let Some(codec) = self.handlers.find_codec_by_durable_name(event_type) {
460                    if (**typed).type_id() == codec.type_id {
461                        return Ok((Arc::clone(typed), codec.type_id));
462                    }
463                }
464            }
465        }
466
467        // Slow path: deserialize from JSON (replay, hydration, or no ephemeral).
468        // Apply upcasters before decoding.
469        let upcasted = self.upcasters.upcast(event_type, 0, payload.clone())?;
470
471        let codec = self.handlers.find_codec_by_durable_name(event_type);
472
473        if let Some(codec) = codec {
474            let typed = (codec.decode)(&upcasted)?;
475            Ok((typed, codec.type_id))
476        } else {
477            warn!(
478                event_type = %event_type,
479                "No codec registered for event type — falling back to raw JSON. \
480                 If this event was emitted by a queued handler, ensure the \
481                 receiving handler is registered with the engine."
482            );
483            Ok((Arc::new(upcasted), TypeId::of::<serde_json::Value>()))
484        }
485    }
486
487    pub(crate) fn serialize_emitted_events(
488        &self,
489        emitted: Vec<EventOutput>,
490        execution: &QueuedHandler,
491    ) -> Result<Vec<EmittedEvent>> {
492        let mut result = Vec::with_capacity(emitted.len());
493        for output in emitted {
494            // Auto-register codec so the event can be decoded in the next dispatch cycle
495            if let Some(codec) = &output.codec {
496                self.handlers.register_codec(codec.clone());
497            }
498
499            result.push(EmittedEvent {
500                durable_name: output.durable_name,
501                event_prefix: output.event_prefix,
502                persistent: output.persistent,
503                payload: output.payload,
504                handler_id: Some(execution.handler_id.clone()),
505                ephemeral: output.ephemeral,
506            });
507        }
508
509        Ok(result)
510    }
511
512    fn build_dlq_terminal_event(
513        &self,
514        handler: &Handler<D>,
515        source_event: Arc<dyn Any + Send + Sync>,
516        source_type_id: TypeId,
517        execution: &QueuedHandler,
518        reason: &str,
519        error: String,
520    ) -> Result<Vec<EmittedEvent>> {
521        let Some(mapper) = handler.dlq_terminal_mapper.as_ref() else {
522            // Global fallback
523            if let Some(global) = self.global_dlq_mapper.as_ref() {
524                let mut emitted = global(DlqTerminalInfo {
525                    handler_id: execution.handler_id.clone(),
526                    source_event_type: execution.event_type.clone(),
527                    source_event_id: execution.event_id,
528                    error,
529                    reason: reason.to_string(),
530                    attempts: execution.attempts,
531                    max_attempts: execution.max_attempts,
532                })?;
533                if emitted.handler_id.is_none() {
534                    emitted.handler_id = Some(execution.handler_id.clone());
535                }
536                return Ok(vec![emitted]);
537            }
538            return Ok(Vec::new());
539        };
540
541        let mut emitted = mapper(
542            source_event,
543            source_type_id,
544            DlqTerminalInfo {
545                handler_id: execution.handler_id.clone(),
546                source_event_type: execution.event_type.clone(),
547                source_event_id: execution.event_id,
548                error,
549                reason: reason.to_string(),
550                attempts: execution.attempts,
551                max_attempts: execution.max_attempts,
552            },
553        )?;
554
555        // Ensure handler_id is set for causal tracking
556        if emitted.handler_id.is_none() {
557            emitted.handler_id = Some(execution.handler_id.clone());
558        }
559
560        Ok(vec![emitted])
561    }
562}
563
564/// Result of handler execution.
565#[derive(Debug)]
566pub struct HandlerResult {
567    pub status: HandlerStatus,
568    pub emitted_events: Vec<EmittedEvent>,
569    pub result: serde_json::Value,
570    /// Log entries captured during handler execution.
571    pub log_entries: Vec<crate::types::LogEntry>,
572}
573
574/// Handler execution status.
575#[derive(Debug)]
576pub enum HandlerStatus {
577    Success,
578    Failed { error: String, attempts: i32 },
579    Retry { error: String, attempts: i32 },
580    Timeout,
581}