1use super::super::*;
2use super::errors::RuntimeJobsError;
3use super::request::{DomainEventDispatchRequest, JobDispatchRequest};
4use super::types::{DomainEventDispatch, RuntimeEventSubscriptionDefinition, RuntimeJobDefinition};
5use coil_jobs::DeadLetterId;
6use std::sync::Arc;
7
8#[derive(Debug, Clone)]
9pub struct JobsHost {
10 pub customer_app: String,
11 pub scheduler_node_id: String,
12 pub runtime: JobsRuntimeServices,
13 pub telemetry: coil_observability::TelemetryCatalog,
14 pub queue_topology: QueueTopology,
15 pub registered_jobs: Vec<RuntimeJobDefinition>,
16 pub registered_event_subscriptions: Vec<RuntimeEventSubscriptionDefinition>,
17 pub jobs_domain: JobsDomain,
18 pub shared_backend_namespace: String,
19 coordinator: JobsCoordinator,
20 next_job_sequence: u64,
21 next_event_sequence: u64,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub struct JobsQueueSnapshot {
26 pub ready: usize,
27 pub scheduled: usize,
28 pub in_flight: usize,
29 pub dead_letters: usize,
30}
31
32impl JobsHost {
33 #[cfg_attr(not(test), allow(dead_code))]
34 pub(crate) fn new(
35 customer_app: String,
36 scheduler_node_id: String,
37 runtime: JobsRuntimeServices,
38 telemetry: coil_observability::TelemetryCatalog,
39 queue_topology: QueueTopology,
40 registered_jobs: Vec<RuntimeJobDefinition>,
41 registered_event_subscriptions: Vec<RuntimeEventSubscriptionDefinition>,
42 jobs_domain: JobsDomain,
43 shared_runtime: Arc<dyn coil_jobs::JobsCoordinationRuntime>,
44 shared_backend_namespace: String,
45 ) -> Self {
46 let coordinator = runtime.coordinator_with_shared_runtime(shared_runtime);
47 Self {
48 customer_app,
49 scheduler_node_id,
50 runtime,
51 telemetry,
52 queue_topology,
53 registered_jobs,
54 registered_event_subscriptions,
55 jobs_domain,
56 shared_backend_namespace,
57 coordinator,
58 next_job_sequence: 0,
59 next_event_sequence: 0,
60 }
61 }
62
63 pub fn enqueue_spec(
64 &mut self,
65 spec: JobSpec,
66 now: JobInstant,
67 ) -> Result<JobId, RuntimeJobsError> {
68 let job_id = spec.job_id.clone();
69 self.coordinator.enqueue(spec, now)?;
70 self.refresh_observability("jobs.enqueue_spec", "ok", Some(job_id.as_str()), now);
71 Ok(job_id)
72 }
73
74 pub fn retry_dead_letter(
75 &mut self,
76 dead_letter_id: impl Into<String>,
77 now_unix_seconds: u64,
78 ) -> Result<JobId, RuntimeJobsError> {
79 let dead_letter_id = DeadLetterId::new(dead_letter_id.into())?;
80 let now = JobInstant::from_unix_seconds(now_unix_seconds);
81 let record = self.coordinator.retry_dead_letter(&dead_letter_id, now)?;
82 self.refresh_observability(
83 "jobs.retry_dead_letter",
84 "ok",
85 Some(record.spec.job_id.as_str()),
86 now,
87 );
88 Ok(record.spec.job_id)
89 }
90
91 pub fn enqueue_job(
92 &mut self,
93 request: JobDispatchRequest,
94 now: JobInstant,
95 ) -> Result<JobId, RuntimeJobsError> {
96 let Some(definition) = self
97 .registered_jobs
98 .iter()
99 .find(|definition| definition.contract.name == request.job_name)
100 .cloned()
101 else {
102 return Err(RuntimeJobsError::UnknownJob {
103 job: request.job_name,
104 });
105 };
106
107 match definition.contract.trigger {
108 JobTriggerKind::Scheduled if request.scheduled_for.is_none() => {
109 return Err(RuntimeJobsError::ScheduledJobRequiresSchedule {
110 job: definition.contract.name,
111 });
112 }
113 JobTriggerKind::Scheduled => {}
114 JobTriggerKind::DomainEvent => {
115 return Err(RuntimeJobsError::DomainEventJobRequiresEventDispatch {
116 job: definition.contract.name,
117 });
118 }
119 trigger if request.scheduled_for.is_some() => {
120 return Err(RuntimeJobsError::UnexpectedSchedule {
121 job: definition.contract.name,
122 trigger,
123 });
124 }
125 _ => {}
126 }
127
128 let mut spec = JobSpec::new(
129 self.issue_job_id(&definition.contract.name)?,
130 JobName::new(definition.contract.name.clone())?,
131 definition.queue.clone(),
132 request.payload_description,
133 )?
134 .with_retry_policy(definition.retry_policy.clone());
135
136 if let Some(scheduled_for) = request.scheduled_for {
137 spec = spec.scheduled_for(scheduled_for);
138 }
139
140 match request.idempotency_key {
141 Some(key) => {
142 spec = spec.with_idempotency_key(IdempotencyKey::new(key)?);
143 }
144 None if definition.retry_policy.is_retrying() => {
145 return Err(RuntimeJobsError::MissingIdempotencyKey {
146 job: definition.contract.name,
147 });
148 }
149 None => {}
150 }
151
152 let job_id = spec.job_id.clone();
153 self.coordinator.enqueue(spec, now)?;
154 self.refresh_observability("jobs.enqueue_job", "ok", Some(job_id.as_str()), now);
155 Ok(job_id)
156 }
157
158 pub fn emit_domain_event(
159 &mut self,
160 request: DomainEventDispatchRequest,
161 now: JobInstant,
162 ) -> Result<DomainEventDispatch, RuntimeJobsError> {
163 let event_type = DomainEventType::new(request.event_type.clone())?;
164 let event_id = self.issue_event_id(&request.event_type)?;
165 let mut envelope = DomainEventEnvelope::new(
166 event_id.clone(),
167 event_type.clone(),
168 request.aggregate_kind,
169 request.aggregate_id,
170 now,
171 request.payload_description,
172 )?;
173
174 if let Some(correlation_id) = request.correlation_id {
175 envelope = envelope.with_correlation_id(correlation_id)?;
176 }
177
178 if let Some(causation_id) = request.causation_id {
179 envelope = envelope.with_causation_id(causation_id)?;
180 }
181
182 let mut enqueued_jobs = Vec::new();
183 for subscription in self
184 .registered_event_subscriptions
185 .iter()
186 .filter(|subscription| subscription.event_type == event_type)
187 .cloned()
188 {
189 let mut spec = JobSpec::new(
190 JobId::new(format!(
191 "event:{}:{}",
192 event_id.as_str(),
193 subscription.subscription_id.as_str()
194 ))?,
195 JobName::new(format!("event-handler:{}", subscription.job_name))?,
196 subscription.reaction_queue,
197 format!(
198 "dispatch {} for {}:{}",
199 event_type.as_str(),
200 envelope.aggregate_kind,
201 envelope.aggregate_id
202 ),
203 )?
204 .with_retry_policy(subscription.retry_policy.clone());
205
206 if subscription.retry_policy.is_retrying() {
207 spec = spec.with_idempotency_key(IdempotencyKey::new(format!(
208 "event:{}:{}:{}",
209 event_id.as_str(),
210 subscription.module,
211 subscription.job_name
212 ))?);
213 }
214
215 let job_id = spec.job_id.clone();
216 self.coordinator.enqueue(spec, now)?;
217 enqueued_jobs.push(job_id);
218 }
219
220 self.refresh_observability("jobs.emit_domain_event", "ok", Some(event_id.as_str()), now);
221
222 Ok(DomainEventDispatch {
223 event_id,
224 event_type,
225 enqueued_jobs,
226 })
227 }
228
229 pub fn acquire_scheduler_leadership(
230 &mut self,
231 now: JobInstant,
232 lease_ttl: std::time::Duration,
233 ) -> Result<SchedulerLeadership, RuntimeJobsError> {
234 let leadership = self.coordinator.acquire_scheduler_leadership(
235 self.scheduler_node_id.clone(),
236 now,
237 lease_ttl,
238 )?;
239 self.refresh_observability("jobs.acquire_scheduler_leadership", "ok", None, now);
240 Ok(leadership)
241 }
242
243 pub fn promote_due_jobs(&mut self, now: JobInstant) -> Result<Vec<JobId>, RuntimeJobsError> {
244 let promoted = self
245 .coordinator
246 .promote_due_jobs(&self.scheduler_node_id, now)?;
247 self.refresh_observability("jobs.promote_due_jobs", "ok", None, now);
248 Ok(promoted)
249 }
250
251 pub fn lease_ready_jobs(
252 &mut self,
253 queue: &JobQueueName,
254 worker_id: impl Into<String>,
255 now: JobInstant,
256 lease_ttl: std::time::Duration,
257 max_jobs: usize,
258 ) -> Result<Vec<JobLease>, RuntimeJobsError> {
259 let leased = self
260 .coordinator
261 .lease_ready_jobs(queue, worker_id, now, lease_ttl, max_jobs)?;
262 self.refresh_observability("jobs.lease_ready_jobs", "ok", None, now);
263 Ok(leased)
264 }
265
266 pub fn acknowledge_completed(
267 &mut self,
268 lease: &JobLease,
269 now: JobInstant,
270 ) -> Result<(), RuntimeJobsError> {
271 self.coordinator.acknowledge_completed(lease, now)?;
272 self.refresh_observability(
273 "jobs.acknowledge_completed",
274 "ok",
275 Some(lease.record.spec.job_id.as_str()),
276 now,
277 );
278 Ok(())
279 }
280
281 pub fn acknowledge_failed(
282 &mut self,
283 lease: &JobLease,
284 now: JobInstant,
285 reason: DeadLetterReason,
286 error_message: impl Into<String>,
287 ) -> Result<JobFailureDisposition, RuntimeJobsError> {
288 let outcome = self
289 .coordinator
290 .acknowledge_failed(lease, now, reason, error_message.into())?;
291 self.refresh_observability(
292 "jobs.acknowledge_failed",
293 "ok",
294 Some(lease.record.spec.job_id.as_str()),
295 now,
296 );
297 Ok(outcome)
298 }
299
300 pub fn coordinator(&self) -> &JobsCoordinator {
301 &self.coordinator
302 }
303
304 pub fn queue_snapshot(&mut self) -> JobsQueueSnapshot {
305 self.coordinator.refresh();
306 JobsQueueSnapshot {
307 ready: self.coordinator.ready_jobs().len(),
308 scheduled: self.coordinator.scheduled_jobs().len(),
309 in_flight: self.coordinator.in_flight_jobs().len(),
310 dead_letters: self.coordinator.dead_letters().len(),
311 }
312 }
313
314 fn issue_job_id(&mut self, job_name: &str) -> Result<JobId, RuntimeJobsError> {
315 self.next_job_sequence += 1;
316 Ok(JobId::new(format!(
317 "job:{}:{}",
318 job_name, self.next_job_sequence
319 ))?)
320 }
321
322 fn issue_event_id(&mut self, event_type: &str) -> Result<DomainEventId, RuntimeJobsError> {
323 self.next_event_sequence += 1;
324 Ok(DomainEventId::new(format!(
325 "evt:{}:{}",
326 event_type, self.next_event_sequence
327 ))?)
328 }
329
330 fn refresh_observability(
331 &mut self,
332 span: &str,
333 outcome: &str,
334 trace_id: Option<&str>,
335 now: JobInstant,
336 ) {
337 let snapshot = self.queue_snapshot();
338 let depth = snapshot.ready + snapshot.scheduled + snapshot.in_flight;
339 let _ = self
340 .telemetry
341 .set_gauge("coil.queue.depth", depth.min(i64::MAX as usize) as i64);
342 let _ = self.telemetry.record_trace(
343 coil_observability::TraceRecord::new(
344 trace_id
345 .map(ToOwned::to_owned)
346 .unwrap_or_else(|| format!("{}:{}", span, self.scheduler_node_id)),
347 span,
348 outcome,
349 now.as_unix_seconds(),
350 )
351 .with_field("customer_app", self.customer_app.clone())
352 .with_field("scheduler_node_id", self.scheduler_node_id.clone())
353 .with_field("ready", snapshot.ready.to_string())
354 .with_field("scheduled", snapshot.scheduled.to_string())
355 .with_field("in_flight", snapshot.in_flight.to_string())
356 .with_field("dead_letters", snapshot.dead_letters.to_string()),
357 );
358 }
359}