1use anyhow::Result;
7use std::any::{Any, TypeId};
8use std::sync::Arc;
9use tokio::time::timeout;
10use tracing::{info, warn};
11use uuid::Uuid;
12
13use crate::aggregator::AggregatorRegistry;
14use crate::handler::{Context, DlqTerminalInfo, EventOutput, GlobalDlqMapper, Handler};
15use crate::handler_queue::HandlerQueue;
16use crate::handler_registry::HandlerRegistry;
17use crate::types::{
18 EmittedEvent, EventWorkerConfig, HandlerIntent, HandlerWorkerConfig,
19 IntentCommit, PersistedEvent, ProjectionFailure, QueuedHandler, NAMESPACE_SEESAW,
20};
21use crate::upcaster::UpcasterRegistry;
22
23pub struct JobExecutor<D>
28where
29 D: Send + Sync + 'static,
30{
31 deps: Arc<D>,
32 queue: Arc<dyn HandlerQueue>,
33 handlers: Arc<HandlerRegistry<D>>,
34 aggregator_registry: Arc<AggregatorRegistry>,
35 upcasters: Arc<UpcasterRegistry>,
36 global_dlq_mapper: Option<GlobalDlqMapper>,
37}
38
39impl<D> JobExecutor<D>
40where
41 D: Send + Sync + 'static,
42{
43 pub fn new(
45 deps: Arc<D>,
46 queue: Arc<dyn HandlerQueue>,
47 handlers: Arc<HandlerRegistry<D>>,
48 aggregator_registry: Arc<AggregatorRegistry>,
49 upcasters: Arc<UpcasterRegistry>,
50 global_dlq_mapper: Option<GlobalDlqMapper>,
51 ) -> Self {
52 Self {
53 deps,
54 queue,
55 handlers,
56 aggregator_registry,
57 upcasters,
58 global_dlq_mapper,
59 }
60 }
61
62 pub async fn process_event(
74 &self,
75 event: &PersistedEvent,
76 _config: &EventWorkerConfig,
77 ) -> Result<IntentCommit> {
78 self.process_event_inner(event, _config, false).await
79 }
80
81 pub async fn process_event_inner(
82 &self,
83 event: &PersistedEvent,
84 _config: &EventWorkerConfig,
85 skip_projections: bool,
86 ) -> Result<IntentCommit> {
87 info!(
88 "Processing event: type={}, correlation={}, position={}",
89 event.event_type, event.correlation_id, event.position
90 );
91
92 let (typed_event, event_type_id) = self.decode_event(&event.event_type, &event.payload, event.ephemeral.as_ref())?;
94
95 let matching_handlers: Vec<_> = self
97 .handlers
98 .all()
99 .into_iter()
100 .filter(|h| h.can_handle(event_type_id))
101 .collect();
102
103 let mut handler_descriptions = std::collections::HashMap::new();
111 for handler in self.handlers.all() {
112 if handler.has_describe() {
113 let ctx = self.make_context(
114 handler.id.clone(),
115 format!("describe::{}", handler.id),
116 event.correlation_id,
117 event.event_id,
118 event.parent_id,
119 );
120 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
121 handler.call_describe(&ctx)
122 })) {
123 Ok(Some(value)) => {
124 handler_descriptions.insert(handler.id.clone(), value);
125 }
126 Ok(None) => {}
127 Err(_) => {
128 tracing::warn!(
129 handler_id = %handler.id,
130 "describe() panicked, skipping"
131 );
132 }
133 }
134 }
135 }
136
137 let hops = event.metadata.get("_hops")
139 .and_then(|v| v.as_u64())
140 .unwrap_or(0) as i32;
141 let mut handler_intents = Vec::new();
142 for handler in &matching_handlers {
143 let execute_at = match handler.delay {
144 Some(delay) => {
145 chrono::Utc::now()
146 + chrono::Duration::from_std(delay)
147 .map_err(|_| anyhow::anyhow!("invalid handler delay"))?
148 }
149 None => chrono::Utc::now(),
150 };
151 let timeout_seconds = handler
152 .timeout
153 .map(|d| d.as_secs() as i32)
154 .unwrap_or(900)
155 .max(1);
156 handler_intents.push(HandlerIntent {
157 handler_id: handler.id.clone(),
158 parent_event_id: event.parent_id,
159 execute_at,
160 timeout_seconds,
161 max_attempts: handler.max_attempts as i32,
162 priority: handler.priority.unwrap_or(10),
163 hops,
164 });
165 }
166
167 let mut projection_failures = Vec::new();
170
171 let projections = if skip_projections { Vec::new() } else { self.handlers.projections() };
172 for projection in &projections {
173 let any_event = crate::handler::AnyEvent {
174 value: typed_event.clone(),
175 type_id: event_type_id,
176 };
177 let idempotency_key = Uuid::new_v5(
178 &NAMESPACE_SEESAW,
179 format!("{}-{}", event.event_id, projection.id).as_bytes(),
180 )
181 .to_string();
182 let ctx = self.make_context(
183 projection.id.clone(),
184 idempotency_key,
185 event.correlation_id,
186 event.event_id,
187 event.parent_id,
188 );
189
190 if let Err(error) = (projection.handler)(any_event, ctx).await {
191 let error_string = error.to_string();
192 warn!(
193 "Projection handler failed: event_id={}, projection_id={}, error={}",
194 event.event_id, projection.id, error_string
195 );
196 projection_failures.push(ProjectionFailure {
197 handler_id: projection.id.clone(),
198 error: error_string,
199 reason: "projection_failed".to_string(),
200 attempts: 1,
201 });
202 }
203 }
204
205 Ok(IntentCommit {
207 event_id: event.event_id,
208 correlation_id: event.correlation_id,
209 event_type: event.event_type.clone(),
210 event_payload: event.payload.clone(),
211 checkpoint: event.position,
212 intents: handler_intents,
213 projection_failures,
214 handler_descriptions,
215 park: None,
216 })
217 }
218
219 pub async fn execute_handler(
221 &self,
222 execution: QueuedHandler,
223 config: &HandlerWorkerConfig,
224 ) -> Result<HandlerResult> {
225 info!(
226 "Processing handler: handler_id={}, workflow={}, priority={}, attempt={}/{}",
227 execution.handler_id,
228 execution.correlation_id,
229 execution.priority,
230 execution.attempts,
231 execution.max_attempts
232 );
233
234 let Some(handler) = self.handlers.find_by_id(&execution.handler_id) else {
236 let error = format!(
237 "No handler registered for id '{}'",
238 execution.handler_id
239 );
240 warn!("{}", error);
241 return Ok(HandlerResult {
242 status: if execution.attempts >= execution.max_attempts {
243 HandlerStatus::Failed {
244 error: error.clone(),
245 attempts: execution.attempts,
246 }
247 } else {
248 HandlerStatus::Retry {
249 error,
250 attempts: execution.attempts,
251 }
252 },
253 emitted_events: Vec::new(),
254 result: serde_json::json!({}),
255
256 log_entries: Vec::new(),
257 });
258 };
259
260 let (typed_event, type_id) =
261 self.decode_event(&execution.event_type, &execution.event_payload, execution.ephemeral.as_ref())?;
262
263 let idempotency_key = Uuid::new_v5(
264 &NAMESPACE_SEESAW,
265 format!("{}-{}", execution.event_id, execution.handler_id).as_bytes(),
266 )
267 .to_string();
268
269 let journal_entries = self
270 .queue
271 .load_journal(&handler.id, execution.event_id)
272 .await?;
273
274 let ctx = self
275 .make_context(
276 handler.id.clone(),
277 idempotency_key,
278 execution.correlation_id,
279 execution.event_id,
280 execution.parent_event_id,
281 )
282 .with_journal(self.queue.clone(), journal_entries);
283
284 let timeout_duration = if execution.timeout_seconds > 0 {
286 std::time::Duration::from_secs(execution.timeout_seconds as u64)
287 } else {
288 config.default_timeout
289 };
290
291 let handler_fut = handler.make_handler_future(typed_event.clone(), type_id, ctx.clone());
292 let result = timeout(timeout_duration, handler_fut)
293 .await;
294
295 match result {
297 Ok(Ok(emitted_raw)) => {
298 let emitted_events = self.serialize_emitted_events(
300 emitted_raw,
301 &execution,
302 )?;
303
304 info!("Handler completed successfully: {}", execution.handler_id);
305 Ok(HandlerResult {
306 status: HandlerStatus::Success,
307 emitted_events,
308 result: serde_json::json!({ "status": "ok" }),
309
310 log_entries: ctx.logger.drain(),
311 })
312 }
313 Ok(Err(e)) => {
314 warn!(
315 "Handler failed: {} (attempt {}/{}): {}",
316 execution.handler_id, execution.attempts, execution.max_attempts, e
317 );
318
319 let status = if execution.attempts >= execution.max_attempts {
320 HandlerStatus::Failed {
321 error: e.to_string(),
322 attempts: execution.attempts,
323 }
324 } else {
325 HandlerStatus::Retry {
326 error: e.to_string(),
327 attempts: execution.attempts,
328 }
329 };
330
331 let emitted_events =
333 if execution.attempts >= execution.max_attempts
334 && (handler.dlq_terminal_mapper.is_some() || self.global_dlq_mapper.is_some())
335 {
336 self.build_dlq_terminal_event(
337 &handler,
338 typed_event,
339 type_id,
340 &execution,
341 "failed",
342 e.to_string(),
343 )?
344 } else {
345 Vec::new()
346 };
347
348 Ok(HandlerResult {
349 status,
350 emitted_events,
351 result: serde_json::json!({}),
352
353 log_entries: ctx.logger.drain(),
354 })
355 }
356 Err(_) => {
357 warn!("Handler timed out: {}", execution.handler_id);
358
359 let timeout_error = "Handler execution timed out".to_string();
360
361 let status = if execution.attempts >= execution.max_attempts {
362 HandlerStatus::Failed {
363 error: timeout_error.clone(),
364 attempts: execution.attempts,
365 }
366 } else {
367 HandlerStatus::Retry {
368 error: timeout_error.clone(),
369 attempts: execution.attempts,
370 }
371 };
372
373 let emitted_events = if execution.attempts >= execution.max_attempts
375 && (handler.dlq_terminal_mapper.is_some() || self.global_dlq_mapper.is_some())
376 {
377 self.build_dlq_terminal_event(
378 &handler,
379 typed_event,
380 type_id,
381 &execution,
382 "timeout",
383 timeout_error,
384 )?
385 } else {
386 Vec::new()
387 };
388
389 Ok(HandlerResult {
390 status,
391 emitted_events,
392 result: serde_json::json!({}),
393
394 log_entries: ctx.logger.drain(),
395 })
396 }
397 }
398 }
399
400 pub async fn run_startup_handlers(&self) -> Result<()> {
402 for h in self.handlers.all() {
403 if h.started.is_none() {
404 continue;
405 }
406
407 let ctx = self.make_context(
408 h.id.clone(),
409 format!("startup::{}", h.id),
410 Uuid::nil(),
411 Uuid::nil(),
412 None,
413 );
414
415 h.call_started(ctx)
416 .await
417 .map_err(|e| anyhow::anyhow!("startup handler '{}' failed: {}", h.id, e))?;
418 }
419 Ok(())
420 }
421
422 pub fn handler_registry(&self) -> &Arc<HandlerRegistry<D>> {
424 &self.handlers
425 }
426
427 fn make_context(
430 &self,
431 handler_id: String,
432 idempotency_key: String,
433 correlation_id: Uuid,
434 event_id: Uuid,
435 parent_event_id: Option<Uuid>,
436 ) -> Context<D> {
437 Context::new(
438 handler_id,
439 idempotency_key,
440 correlation_id,
441 event_id,
442 parent_event_id,
443 self.deps.clone(),
444 )
445 .with_aggregator_registry(self.aggregator_registry.clone())
446 }
447
448 fn decode_event(
449 &self,
450 event_type: &str,
451 payload: &serde_json::Value,
452 ephemeral: Option<&Arc<dyn Any + Send + Sync>>,
453 ) -> Result<(Arc<dyn Any + Send + Sync>, TypeId)> {
454 if let Some(typed) = ephemeral {
458 if self.upcasters.is_empty() {
459 if let Some(codec) = self.handlers.find_codec_by_durable_name(event_type) {
460 if (**typed).type_id() == codec.type_id {
461 return Ok((Arc::clone(typed), codec.type_id));
462 }
463 }
464 }
465 }
466
467 let upcasted = self.upcasters.upcast(event_type, 0, payload.clone())?;
470
471 let codec = self.handlers.find_codec_by_durable_name(event_type);
472
473 if let Some(codec) = codec {
474 let typed = (codec.decode)(&upcasted)?;
475 Ok((typed, codec.type_id))
476 } else {
477 warn!(
478 event_type = %event_type,
479 "No codec registered for event type — falling back to raw JSON. \
480 If this event was emitted by a queued handler, ensure the \
481 receiving handler is registered with the engine."
482 );
483 Ok((Arc::new(upcasted), TypeId::of::<serde_json::Value>()))
484 }
485 }
486
487 pub(crate) fn serialize_emitted_events(
488 &self,
489 emitted: Vec<EventOutput>,
490 execution: &QueuedHandler,
491 ) -> Result<Vec<EmittedEvent>> {
492 let mut result = Vec::with_capacity(emitted.len());
493 for output in emitted {
494 if let Some(codec) = &output.codec {
496 self.handlers.register_codec(codec.clone());
497 }
498
499 result.push(EmittedEvent {
500 durable_name: output.durable_name,
501 event_prefix: output.event_prefix,
502 persistent: output.persistent,
503 payload: output.payload,
504 handler_id: Some(execution.handler_id.clone()),
505 ephemeral: output.ephemeral,
506 });
507 }
508
509 Ok(result)
510 }
511
512 fn build_dlq_terminal_event(
513 &self,
514 handler: &Handler<D>,
515 source_event: Arc<dyn Any + Send + Sync>,
516 source_type_id: TypeId,
517 execution: &QueuedHandler,
518 reason: &str,
519 error: String,
520 ) -> Result<Vec<EmittedEvent>> {
521 let Some(mapper) = handler.dlq_terminal_mapper.as_ref() else {
522 if let Some(global) = self.global_dlq_mapper.as_ref() {
524 let mut emitted = global(DlqTerminalInfo {
525 handler_id: execution.handler_id.clone(),
526 source_event_type: execution.event_type.clone(),
527 source_event_id: execution.event_id,
528 error,
529 reason: reason.to_string(),
530 attempts: execution.attempts,
531 max_attempts: execution.max_attempts,
532 })?;
533 if emitted.handler_id.is_none() {
534 emitted.handler_id = Some(execution.handler_id.clone());
535 }
536 return Ok(vec![emitted]);
537 }
538 return Ok(Vec::new());
539 };
540
541 let mut emitted = mapper(
542 source_event,
543 source_type_id,
544 DlqTerminalInfo {
545 handler_id: execution.handler_id.clone(),
546 source_event_type: execution.event_type.clone(),
547 source_event_id: execution.event_id,
548 error,
549 reason: reason.to_string(),
550 attempts: execution.attempts,
551 max_attempts: execution.max_attempts,
552 },
553 )?;
554
555 if emitted.handler_id.is_none() {
557 emitted.handler_id = Some(execution.handler_id.clone());
558 }
559
560 Ok(vec![emitted])
561 }
562}
563
564#[derive(Debug)]
566pub struct HandlerResult {
567 pub status: HandlerStatus,
568 pub emitted_events: Vec<EmittedEvent>,
569 pub result: serde_json::Value,
570 pub log_entries: Vec<crate::types::LogEntry>,
572}
573
574#[derive(Debug)]
576pub enum HandlerStatus {
577 Success,
578 Failed { error: String, attempts: i32 },
579 Retry { error: String, attempts: i32 },
580 Timeout,
581}