Skip to main content

coil_runtime/jobs/
host.rs

1use super::super::*;
2use super::errors::RuntimeJobsError;
3use super::request::{DomainEventDispatchRequest, JobDispatchRequest};
4use super::types::{DomainEventDispatch, RuntimeEventSubscriptionDefinition, RuntimeJobDefinition};
5use coil_jobs::DeadLetterId;
6use std::sync::Arc;
7
8#[derive(Debug, Clone)]
9pub struct JobsHost {
10    pub customer_app: String,
11    pub scheduler_node_id: String,
12    pub runtime: JobsRuntimeServices,
13    pub telemetry: coil_observability::TelemetryCatalog,
14    pub queue_topology: QueueTopology,
15    pub registered_jobs: Vec<RuntimeJobDefinition>,
16    pub registered_event_subscriptions: Vec<RuntimeEventSubscriptionDefinition>,
17    pub jobs_domain: JobsDomain,
18    pub shared_backend_namespace: String,
19    coordinator: JobsCoordinator,
20    next_job_sequence: u64,
21    next_event_sequence: u64,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub struct JobsQueueSnapshot {
26    pub ready: usize,
27    pub scheduled: usize,
28    pub in_flight: usize,
29    pub dead_letters: usize,
30}
31
32impl JobsHost {
33    #[cfg_attr(not(test), allow(dead_code))]
34    pub(crate) fn new(
35        customer_app: String,
36        scheduler_node_id: String,
37        runtime: JobsRuntimeServices,
38        telemetry: coil_observability::TelemetryCatalog,
39        queue_topology: QueueTopology,
40        registered_jobs: Vec<RuntimeJobDefinition>,
41        registered_event_subscriptions: Vec<RuntimeEventSubscriptionDefinition>,
42        jobs_domain: JobsDomain,
43        shared_runtime: Arc<dyn coil_jobs::JobsCoordinationRuntime>,
44        shared_backend_namespace: String,
45    ) -> Self {
46        let coordinator = runtime.coordinator_with_shared_runtime(shared_runtime);
47        Self {
48            customer_app,
49            scheduler_node_id,
50            runtime,
51            telemetry,
52            queue_topology,
53            registered_jobs,
54            registered_event_subscriptions,
55            jobs_domain,
56            shared_backend_namespace,
57            coordinator,
58            next_job_sequence: 0,
59            next_event_sequence: 0,
60        }
61    }
62
63    pub fn enqueue_spec(
64        &mut self,
65        spec: JobSpec,
66        now: JobInstant,
67    ) -> Result<JobId, RuntimeJobsError> {
68        let job_id = spec.job_id.clone();
69        self.coordinator.enqueue(spec, now)?;
70        self.refresh_observability("jobs.enqueue_spec", "ok", Some(job_id.as_str()), now);
71        Ok(job_id)
72    }
73
74    pub fn retry_dead_letter(
75        &mut self,
76        dead_letter_id: impl Into<String>,
77        now_unix_seconds: u64,
78    ) -> Result<JobId, RuntimeJobsError> {
79        let dead_letter_id = DeadLetterId::new(dead_letter_id.into())?;
80        let now = JobInstant::from_unix_seconds(now_unix_seconds);
81        let record = self.coordinator.retry_dead_letter(&dead_letter_id, now)?;
82        self.refresh_observability(
83            "jobs.retry_dead_letter",
84            "ok",
85            Some(record.spec.job_id.as_str()),
86            now,
87        );
88        Ok(record.spec.job_id)
89    }
90
91    pub fn enqueue_job(
92        &mut self,
93        request: JobDispatchRequest,
94        now: JobInstant,
95    ) -> Result<JobId, RuntimeJobsError> {
96        let Some(definition) = self
97            .registered_jobs
98            .iter()
99            .find(|definition| definition.contract.name == request.job_name)
100            .cloned()
101        else {
102            return Err(RuntimeJobsError::UnknownJob {
103                job: request.job_name,
104            });
105        };
106
107        match definition.contract.trigger {
108            JobTriggerKind::Scheduled if request.scheduled_for.is_none() => {
109                return Err(RuntimeJobsError::ScheduledJobRequiresSchedule {
110                    job: definition.contract.name,
111                });
112            }
113            JobTriggerKind::Scheduled => {}
114            JobTriggerKind::DomainEvent => {
115                return Err(RuntimeJobsError::DomainEventJobRequiresEventDispatch {
116                    job: definition.contract.name,
117                });
118            }
119            trigger if request.scheduled_for.is_some() => {
120                return Err(RuntimeJobsError::UnexpectedSchedule {
121                    job: definition.contract.name,
122                    trigger,
123                });
124            }
125            _ => {}
126        }
127
128        let mut spec = JobSpec::new(
129            self.issue_job_id(&definition.contract.name)?,
130            JobName::new(definition.contract.name.clone())?,
131            definition.queue.clone(),
132            request.payload_description,
133        )?
134        .with_retry_policy(definition.retry_policy.clone());
135
136        if let Some(scheduled_for) = request.scheduled_for {
137            spec = spec.scheduled_for(scheduled_for);
138        }
139
140        match request.idempotency_key {
141            Some(key) => {
142                spec = spec.with_idempotency_key(IdempotencyKey::new(key)?);
143            }
144            None if definition.retry_policy.is_retrying() => {
145                return Err(RuntimeJobsError::MissingIdempotencyKey {
146                    job: definition.contract.name,
147                });
148            }
149            None => {}
150        }
151
152        let job_id = spec.job_id.clone();
153        self.coordinator.enqueue(spec, now)?;
154        self.refresh_observability("jobs.enqueue_job", "ok", Some(job_id.as_str()), now);
155        Ok(job_id)
156    }
157
158    pub fn emit_domain_event(
159        &mut self,
160        request: DomainEventDispatchRequest,
161        now: JobInstant,
162    ) -> Result<DomainEventDispatch, RuntimeJobsError> {
163        let event_type = DomainEventType::new(request.event_type.clone())?;
164        let event_id = self.issue_event_id(&request.event_type)?;
165        let mut envelope = DomainEventEnvelope::new(
166            event_id.clone(),
167            event_type.clone(),
168            request.aggregate_kind,
169            request.aggregate_id,
170            now,
171            request.payload_description,
172        )?;
173
174        if let Some(correlation_id) = request.correlation_id {
175            envelope = envelope.with_correlation_id(correlation_id)?;
176        }
177
178        if let Some(causation_id) = request.causation_id {
179            envelope = envelope.with_causation_id(causation_id)?;
180        }
181
182        let mut enqueued_jobs = Vec::new();
183        for subscription in self
184            .registered_event_subscriptions
185            .iter()
186            .filter(|subscription| subscription.event_type == event_type)
187            .cloned()
188        {
189            let mut spec = JobSpec::new(
190                JobId::new(format!(
191                    "event:{}:{}",
192                    event_id.as_str(),
193                    subscription.subscription_id.as_str()
194                ))?,
195                JobName::new(format!("event-handler:{}", subscription.job_name))?,
196                subscription.reaction_queue,
197                format!(
198                    "dispatch {} for {}:{}",
199                    event_type.as_str(),
200                    envelope.aggregate_kind,
201                    envelope.aggregate_id
202                ),
203            )?
204            .with_retry_policy(subscription.retry_policy.clone());
205
206            if subscription.retry_policy.is_retrying() {
207                spec = spec.with_idempotency_key(IdempotencyKey::new(format!(
208                    "event:{}:{}:{}",
209                    event_id.as_str(),
210                    subscription.module,
211                    subscription.job_name
212                ))?);
213            }
214
215            let job_id = spec.job_id.clone();
216            self.coordinator.enqueue(spec, now)?;
217            enqueued_jobs.push(job_id);
218        }
219
220        self.refresh_observability("jobs.emit_domain_event", "ok", Some(event_id.as_str()), now);
221
222        Ok(DomainEventDispatch {
223            event_id,
224            event_type,
225            enqueued_jobs,
226        })
227    }
228
229    pub fn acquire_scheduler_leadership(
230        &mut self,
231        now: JobInstant,
232        lease_ttl: std::time::Duration,
233    ) -> Result<SchedulerLeadership, RuntimeJobsError> {
234        let leadership = self.coordinator.acquire_scheduler_leadership(
235            self.scheduler_node_id.clone(),
236            now,
237            lease_ttl,
238        )?;
239        self.refresh_observability("jobs.acquire_scheduler_leadership", "ok", None, now);
240        Ok(leadership)
241    }
242
243    pub fn promote_due_jobs(&mut self, now: JobInstant) -> Result<Vec<JobId>, RuntimeJobsError> {
244        let promoted = self
245            .coordinator
246            .promote_due_jobs(&self.scheduler_node_id, now)?;
247        self.refresh_observability("jobs.promote_due_jobs", "ok", None, now);
248        Ok(promoted)
249    }
250
251    pub fn lease_ready_jobs(
252        &mut self,
253        queue: &JobQueueName,
254        worker_id: impl Into<String>,
255        now: JobInstant,
256        lease_ttl: std::time::Duration,
257        max_jobs: usize,
258    ) -> Result<Vec<JobLease>, RuntimeJobsError> {
259        let leased = self
260            .coordinator
261            .lease_ready_jobs(queue, worker_id, now, lease_ttl, max_jobs)?;
262        self.refresh_observability("jobs.lease_ready_jobs", "ok", None, now);
263        Ok(leased)
264    }
265
266    pub fn acknowledge_completed(
267        &mut self,
268        lease: &JobLease,
269        now: JobInstant,
270    ) -> Result<(), RuntimeJobsError> {
271        self.coordinator.acknowledge_completed(lease, now)?;
272        self.refresh_observability(
273            "jobs.acknowledge_completed",
274            "ok",
275            Some(lease.record.spec.job_id.as_str()),
276            now,
277        );
278        Ok(())
279    }
280
281    pub fn acknowledge_failed(
282        &mut self,
283        lease: &JobLease,
284        now: JobInstant,
285        reason: DeadLetterReason,
286        error_message: impl Into<String>,
287    ) -> Result<JobFailureDisposition, RuntimeJobsError> {
288        let outcome = self
289            .coordinator
290            .acknowledge_failed(lease, now, reason, error_message.into())?;
291        self.refresh_observability(
292            "jobs.acknowledge_failed",
293            "ok",
294            Some(lease.record.spec.job_id.as_str()),
295            now,
296        );
297        Ok(outcome)
298    }
299
300    pub fn coordinator(&self) -> &JobsCoordinator {
301        &self.coordinator
302    }
303
304    pub fn queue_snapshot(&mut self) -> JobsQueueSnapshot {
305        self.coordinator.refresh();
306        JobsQueueSnapshot {
307            ready: self.coordinator.ready_jobs().len(),
308            scheduled: self.coordinator.scheduled_jobs().len(),
309            in_flight: self.coordinator.in_flight_jobs().len(),
310            dead_letters: self.coordinator.dead_letters().len(),
311        }
312    }
313
314    fn issue_job_id(&mut self, job_name: &str) -> Result<JobId, RuntimeJobsError> {
315        self.next_job_sequence += 1;
316        Ok(JobId::new(format!(
317            "job:{}:{}",
318            job_name, self.next_job_sequence
319        ))?)
320    }
321
322    fn issue_event_id(&mut self, event_type: &str) -> Result<DomainEventId, RuntimeJobsError> {
323        self.next_event_sequence += 1;
324        Ok(DomainEventId::new(format!(
325            "evt:{}:{}",
326            event_type, self.next_event_sequence
327        ))?)
328    }
329
330    fn refresh_observability(
331        &mut self,
332        span: &str,
333        outcome: &str,
334        trace_id: Option<&str>,
335        now: JobInstant,
336    ) {
337        let snapshot = self.queue_snapshot();
338        let depth = snapshot.ready + snapshot.scheduled + snapshot.in_flight;
339        let _ = self
340            .telemetry
341            .set_gauge("coil.queue.depth", depth.min(i64::MAX as usize) as i64);
342        let _ = self.telemetry.record_trace(
343            coil_observability::TraceRecord::new(
344                trace_id
345                    .map(ToOwned::to_owned)
346                    .unwrap_or_else(|| format!("{}:{}", span, self.scheduler_node_id)),
347                span,
348                outcome,
349                now.as_unix_seconds(),
350            )
351            .with_field("customer_app", self.customer_app.clone())
352            .with_field("scheduler_node_id", self.scheduler_node_id.clone())
353            .with_field("ready", snapshot.ready.to_string())
354            .with_field("scheduled", snapshot.scheduled.to_string())
355            .with_field("in_flight", snapshot.in_flight.to_string())
356            .with_field("dead_letters", snapshot.dead_letters.to_string()),
357        );
358    }
359}