1pub const MIXED_WORK_PROTOCOL_SCHEMA_VERSION: u32 = 1;
11
12const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
13const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum MixedWorkQueueClass {
18 Scan,
20 Graph,
22 Parser,
24 Flow,
26 Control,
28}
29
30impl MixedWorkQueueClass {
31 #[must_use]
33 pub const fn as_str(self) -> &'static str {
34 match self {
35 Self::Scan => "scan",
36 Self::Graph => "graph",
37 Self::Parser => "parser",
38 Self::Flow => "flow",
39 Self::Control => "control",
40 }
41 }
42
43 const fn tag(self) -> u64 {
44 match self {
45 Self::Scan => 1,
46 Self::Graph => 2,
47 Self::Parser => 3,
48 Self::Flow => 4,
49 Self::Control => 5,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub enum MixedWorkUnitType {
57 ScanChunk,
59 ScanVerifier,
61 GraphFrontier,
63 GraphCompaction,
65 ParserShard,
67 ParserChangedRange,
69 FlowRelationDelta,
71 FlowFixpointStep,
73 DrainSentinel,
75}
76
77impl MixedWorkUnitType {
78 #[must_use]
80 pub const fn as_str(self) -> &'static str {
81 match self {
82 Self::ScanChunk => "scan_chunk",
83 Self::ScanVerifier => "scan_verifier",
84 Self::GraphFrontier => "graph_frontier",
85 Self::GraphCompaction => "graph_compaction",
86 Self::ParserShard => "parser_shard",
87 Self::ParserChangedRange => "parser_changed_range",
88 Self::FlowRelationDelta => "flow_relation_delta",
89 Self::FlowFixpointStep => "flow_fixpoint_step",
90 Self::DrainSentinel => "drain_sentinel",
91 }
92 }
93
94 const fn tag(self) -> u64 {
95 match self {
96 Self::ScanChunk => 11,
97 Self::ScanVerifier => 12,
98 Self::GraphFrontier => 21,
99 Self::GraphCompaction => 22,
100 Self::ParserShard => 31,
101 Self::ParserChangedRange => 32,
102 Self::FlowRelationDelta => 41,
103 Self::FlowFixpointStep => 42,
104 Self::DrainSentinel => 51,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
111pub struct ResidentArtifactId(pub u32);
112
113impl ResidentArtifactId {
114 #[must_use]
116 pub const fn is_valid(self) -> bool {
117 self.0 != 0
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
123pub struct OutputSlabId(pub u32);
124
125impl OutputSlabId {
126 #[must_use]
128 pub const fn is_valid(self) -> bool {
129 self.0 != 0
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
135pub struct MixedWorkUnit {
136 pub sequence: u64,
138 pub queue_class: MixedWorkQueueClass,
140 pub unit_type: MixedWorkUnitType,
142 pub resident_artifact_id: ResidentArtifactId,
144 pub output_slab_id: OutputSlabId,
146 pub watchdog_budget_ticks: u32,
148 pub payload_digest: u64,
150}
151
152impl MixedWorkUnit {
153 #[must_use]
155 pub const fn new(
156 sequence: u64,
157 queue_class: MixedWorkQueueClass,
158 unit_type: MixedWorkUnitType,
159 resident_artifact_id: ResidentArtifactId,
160 output_slab_id: OutputSlabId,
161 watchdog_budget_ticks: u32,
162 payload_digest: u64,
163 ) -> Self {
164 Self {
165 sequence,
166 queue_class,
167 unit_type,
168 resident_artifact_id,
169 output_slab_id,
170 watchdog_budget_ticks,
171 payload_digest,
172 }
173 }
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub struct MixedWorkProtocolPlan<'a> {
179 pub units: &'a [MixedWorkUnit],
181 pub drain_watchdog_budget_ticks: u64,
183}
184
185impl<'a> MixedWorkProtocolPlan<'a> {
186 #[must_use]
188 pub const fn new(units: &'a [MixedWorkUnit], drain_watchdog_budget_ticks: u64) -> Self {
189 Self {
190 units,
191 drain_watchdog_budget_ticks,
192 }
193 }
194}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198pub struct MixedWorkProtocolEvidence {
199 pub schema_version: u32,
201 pub unit_count: u32,
203 pub scan_units: u32,
205 pub graph_units: u32,
207 pub parser_units: u32,
209 pub flow_units: u32,
211 pub control_units: u32,
213 pub total_watchdog_budget_ticks: u64,
215 pub max_watchdog_budget_ticks: u32,
217 pub drain_watchdog_budget_ticks: u64,
219 pub bounded_drain: bool,
221 pub hidden_host_loop_count: u32,
223 pub deterministic_output_digest: u64,
225}
226
227impl MixedWorkProtocolEvidence {
228 #[must_use]
230 pub const fn covers_scan_graph_parser_flow(self) -> bool {
231 self.scan_units != 0 && self.graph_units != 0 && self.parser_units != 0 && self.flow_units != 0
232 }
233
234 #[must_use]
236 pub const fn is_complete(self) -> bool {
237 self.schema_version == MIXED_WORK_PROTOCOL_SCHEMA_VERSION
238 && self.unit_count != 0
239 && self.bounded_drain
240 && self.hidden_host_loop_count == 0
241 && self.deterministic_output_digest != 0
242 }
243}
244
245#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
247#[non_exhaustive]
248pub enum MixedWorkProtocolError {
249 #[error("mixed-work plan is empty. Fix: publish at least one resident work unit before scheduling.")]
251 EmptyPlan,
252 #[error("mixed-work drain watchdog budget is zero. Fix: provide a positive resident drain budget.")]
254 ZeroDrainWatchdogBudget,
255 #[error("mixed-work unit {sequence} has zero watchdog budget. Fix: assign a positive per-unit watchdog budget.")]
257 ZeroUnitWatchdogBudget {
258 sequence: u64,
260 },
261 #[error("mixed-work unit {sequence} has resident artifact id 0. Fix: publish a resident artifact before queueing work.")]
263 ZeroResidentArtifactId {
264 sequence: u64,
266 },
267 #[error("mixed-work unit {sequence} has output slab id 0. Fix: allocate a resident output slab before queueing work.")]
269 ZeroOutputSlabId {
270 sequence: u64,
272 },
273 #[error(
275 "mixed-work unit {sequence} routes {unit_type} through {queue_class}. Fix: use a unit type owned by the queue class."
276 )]
277 QueueClassMismatch {
278 sequence: u64,
280 queue_class: &'static str,
282 unit_type: &'static str,
284 },
285 #[error("mixed-work unit count {unit_count} overflows u32 evidence. Fix: shard the resident batch.")]
287 UnitCountOverflow {
288 unit_count: usize,
290 },
291 #[error("mixed-work {queue_class} unit count overflowed u32 evidence. Fix: shard that queue class.")]
293 ClassCountOverflow {
294 queue_class: &'static str,
296 },
297 #[error("mixed-work watchdog budget sum overflowed u64. Fix: shard the resident batch.")]
299 WatchdogBudgetOverflow,
300 #[error(
302 "mixed-work watchdog budget {total_watchdog_budget_ticks} exceeds drain budget {drain_watchdog_budget_ticks}. Fix: increase the drain budget or shard the resident batch."
303 )]
304 WatchdogBudgetExceeded {
305 total_watchdog_budget_ticks: u64,
307 drain_watchdog_budget_ticks: u64,
309 },
310}
311
312pub fn mixed_work_protocol_evidence(
320 plan: &MixedWorkProtocolPlan<'_>,
321) -> Result<MixedWorkProtocolEvidence, MixedWorkProtocolError> {
322 validate_mixed_work_protocol(plan)
323}
324
325pub fn validate_mixed_work_protocol(
332 plan: &MixedWorkProtocolPlan<'_>,
333) -> Result<MixedWorkProtocolEvidence, MixedWorkProtocolError> {
334 if plan.units.is_empty() {
335 return Err(MixedWorkProtocolError::EmptyPlan);
336 }
337 if plan.drain_watchdog_budget_ticks == 0 {
338 return Err(MixedWorkProtocolError::ZeroDrainWatchdogBudget);
339 }
340 if plan.units.len() > u32::MAX as usize {
341 return Err(MixedWorkProtocolError::UnitCountOverflow {
342 unit_count: plan.units.len(),
343 });
344 }
345
346 let mut counts = [0_u32; 5];
347 let mut total_watchdog_budget_ticks = 0_u64;
348 let mut max_watchdog_budget_ticks = 0_u32;
349 let mut digest = FNV_OFFSET;
350
351 for unit in plan.units {
352 validate_unit(*unit)?;
353 bump_class_count(&mut counts, unit.queue_class)?;
354 total_watchdog_budget_ticks = total_watchdog_budget_ticks
355 .checked_add(u64::from(unit.watchdog_budget_ticks))
356 .ok_or(MixedWorkProtocolError::WatchdogBudgetOverflow)?;
357 max_watchdog_budget_ticks = max_watchdog_budget_ticks.max(unit.watchdog_budget_ticks);
358 digest = mix_unit_digest(digest, *unit);
359 }
360
361 if total_watchdog_budget_ticks > plan.drain_watchdog_budget_ticks {
362 return Err(MixedWorkProtocolError::WatchdogBudgetExceeded {
363 total_watchdog_budget_ticks,
364 drain_watchdog_budget_ticks: plan.drain_watchdog_budget_ticks,
365 });
366 }
367
368 Ok(MixedWorkProtocolEvidence {
369 schema_version: MIXED_WORK_PROTOCOL_SCHEMA_VERSION,
370 unit_count: plan.units.len() as u32,
371 scan_units: counts[0],
372 graph_units: counts[1],
373 parser_units: counts[2],
374 flow_units: counts[3],
375 control_units: counts[4],
376 total_watchdog_budget_ticks,
377 max_watchdog_budget_ticks,
378 drain_watchdog_budget_ticks: plan.drain_watchdog_budget_ticks,
379 bounded_drain: true,
380 hidden_host_loop_count: 0,
381 deterministic_output_digest: digest,
382 })
383}
384
385fn validate_unit(unit: MixedWorkUnit) -> Result<(), MixedWorkProtocolError> {
386 if unit.watchdog_budget_ticks == 0 {
387 return Err(MixedWorkProtocolError::ZeroUnitWatchdogBudget {
388 sequence: unit.sequence,
389 });
390 }
391 if !unit.resident_artifact_id.is_valid() {
392 return Err(MixedWorkProtocolError::ZeroResidentArtifactId {
393 sequence: unit.sequence,
394 });
395 }
396 if !unit.output_slab_id.is_valid() {
397 return Err(MixedWorkProtocolError::ZeroOutputSlabId {
398 sequence: unit.sequence,
399 });
400 }
401 if !unit_type_matches_queue(unit.queue_class, unit.unit_type) {
402 return Err(MixedWorkProtocolError::QueueClassMismatch {
403 sequence: unit.sequence,
404 queue_class: unit.queue_class.as_str(),
405 unit_type: unit.unit_type.as_str(),
406 });
407 }
408 Ok(())
409}
410
411const fn unit_type_matches_queue(
412 queue_class: MixedWorkQueueClass,
413 unit_type: MixedWorkUnitType,
414) -> bool {
415 matches!(
416 (queue_class, unit_type),
417 (MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanChunk)
418 | (MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanVerifier)
419 | (MixedWorkQueueClass::Graph, MixedWorkUnitType::GraphFrontier)
420 | (MixedWorkQueueClass::Graph, MixedWorkUnitType::GraphCompaction)
421 | (MixedWorkQueueClass::Parser, MixedWorkUnitType::ParserShard)
422 | (MixedWorkQueueClass::Parser, MixedWorkUnitType::ParserChangedRange)
423 | (MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowRelationDelta)
424 | (MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowFixpointStep)
425 | (MixedWorkQueueClass::Control, MixedWorkUnitType::DrainSentinel)
426 )
427}
428
429fn bump_class_count(
430 counts: &mut [u32; 5],
431 queue_class: MixedWorkQueueClass,
432) -> Result<(), MixedWorkProtocolError> {
433 let index = match queue_class {
434 MixedWorkQueueClass::Scan => 0,
435 MixedWorkQueueClass::Graph => 1,
436 MixedWorkQueueClass::Parser => 2,
437 MixedWorkQueueClass::Flow => 3,
438 MixedWorkQueueClass::Control => 4,
439 };
440 counts[index] = counts[index]
441 .checked_add(1)
442 .ok_or(MixedWorkProtocolError::ClassCountOverflow {
443 queue_class: queue_class.as_str(),
444 })?;
445 Ok(())
446}
447
448fn mix_unit_digest(mut digest: u64, unit: MixedWorkUnit) -> u64 {
449 digest = fnv_mix(digest, unit.sequence);
450 digest = fnv_mix(digest, unit.queue_class.tag());
451 digest = fnv_mix(digest, unit.unit_type.tag());
452 digest = fnv_mix(digest, u64::from(unit.resident_artifact_id.0));
453 digest = fnv_mix(digest, u64::from(unit.output_slab_id.0));
454 digest = fnv_mix(digest, u64::from(unit.watchdog_budget_ticks));
455 fnv_mix(digest, unit.payload_digest)
456}
457
458fn fnv_mix(mut digest: u64, value: u64) -> u64 {
459 for byte in value.to_le_bytes() {
460 digest ^= u64::from(byte);
461 digest = digest.wrapping_mul(FNV_PRIME);
462 }
463 digest
464}
465
466#[cfg(test)]
467mod tests {
468 use super::{
469 mixed_work_protocol_evidence, validate_mixed_work_protocol, MixedWorkProtocolError,
470 MixedWorkProtocolPlan, MixedWorkQueueClass, MixedWorkUnit, MixedWorkUnitType,
471 OutputSlabId, ResidentArtifactId, MIXED_WORK_PROTOCOL_SCHEMA_VERSION,
472 };
473
474 fn unit(
475 sequence: u64,
476 queue_class: MixedWorkQueueClass,
477 unit_type: MixedWorkUnitType,
478 ) -> MixedWorkUnit {
479 MixedWorkUnit::new(
480 sequence,
481 queue_class,
482 unit_type,
483 ResidentArtifactId(100 + sequence as u32),
484 OutputSlabId(200 + sequence as u32),
485 10,
486 0xfeed_0000 + sequence,
487 )
488 }
489
490 #[test]
491 fn mixed_scan_graph_parser_flow_work_emits_deterministic_bounded_drain_evidence() {
492 let units = [
493 unit(1, MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanChunk),
494 unit(2, MixedWorkQueueClass::Graph, MixedWorkUnitType::GraphFrontier),
495 unit(3, MixedWorkQueueClass::Parser, MixedWorkUnitType::ParserShard),
496 unit(4, MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowRelationDelta),
497 unit(5, MixedWorkQueueClass::Control, MixedWorkUnitType::DrainSentinel),
498 ];
499 let plan = MixedWorkProtocolPlan::new(&units, 64);
500
501 let first = mixed_work_protocol_evidence(&plan)
502 .expect("Fix: valid mixed-work plan should emit evidence");
503 let second = validate_mixed_work_protocol(&plan)
504 .expect("Fix: valid mixed-work plan should emit stable evidence");
505
506 assert_eq!(first, second);
507 assert_eq!(first.schema_version, MIXED_WORK_PROTOCOL_SCHEMA_VERSION);
508 assert!(first.is_complete());
509 assert!(first.covers_scan_graph_parser_flow());
510 assert!(first.bounded_drain);
511 assert_eq!(first.hidden_host_loop_count, 0);
512 assert_eq!(first.unit_count, 5);
513 assert_eq!(first.total_watchdog_budget_ticks, 50);
514 assert_eq!(first.max_watchdog_budget_ticks, 10);
515 assert_ne!(first.deterministic_output_digest, 0);
516 }
517
518 #[test]
519 fn zero_watchdog_budget_is_rejected() {
520 let units = [MixedWorkUnit::new(
521 7,
522 MixedWorkQueueClass::Scan,
523 MixedWorkUnitType::ScanChunk,
524 ResidentArtifactId(1),
525 OutputSlabId(1),
526 0,
527 9,
528 )];
529 let plan = MixedWorkProtocolPlan::new(&units, 1);
530
531 assert!(matches!(
532 validate_mixed_work_protocol(&plan),
533 Err(MixedWorkProtocolError::ZeroUnitWatchdogBudget { sequence: 7 })
534 ));
535 }
536
537 #[test]
538 fn class_unit_mismatch_is_rejected() {
539 let units = [MixedWorkUnit::new(
540 9,
541 MixedWorkQueueClass::Parser,
542 MixedWorkUnitType::FlowFixpointStep,
543 ResidentArtifactId(1),
544 OutputSlabId(1),
545 1,
546 9,
547 )];
548 let plan = MixedWorkProtocolPlan::new(&units, 1);
549
550 assert!(matches!(
551 validate_mixed_work_protocol(&plan),
552 Err(MixedWorkProtocolError::QueueClassMismatch {
553 sequence: 9,
554 queue_class: "parser",
555 unit_type: "flow_fixpoint_step"
556 })
557 ));
558 }
559
560 #[test]
561 fn drain_budget_must_bound_all_units() {
562 let units = [
563 unit(1, MixedWorkQueueClass::Scan, MixedWorkUnitType::ScanChunk),
564 unit(2, MixedWorkQueueClass::Flow, MixedWorkUnitType::FlowRelationDelta),
565 ];
566 let plan = MixedWorkProtocolPlan::new(&units, 19);
567
568 assert!(matches!(
569 validate_mixed_work_protocol(&plan),
570 Err(MixedWorkProtocolError::WatchdogBudgetExceeded {
571 total_watchdog_budget_ticks: 20,
572 drain_watchdog_budget_ticks: 19
573 })
574 ));
575 }
576
577 #[test]
578 fn resident_artifact_and_output_slab_ids_are_required() {
579 let bad_artifact = [MixedWorkUnit::new(
580 1,
581 MixedWorkQueueClass::Scan,
582 MixedWorkUnitType::ScanChunk,
583 ResidentArtifactId(0),
584 OutputSlabId(1),
585 1,
586 1,
587 )];
588 assert!(matches!(
589 validate_mixed_work_protocol(&MixedWorkProtocolPlan::new(&bad_artifact, 1)),
590 Err(MixedWorkProtocolError::ZeroResidentArtifactId { sequence: 1 })
591 ));
592
593 let bad_slab = [MixedWorkUnit::new(
594 2,
595 MixedWorkQueueClass::Scan,
596 MixedWorkUnitType::ScanChunk,
597 ResidentArtifactId(1),
598 OutputSlabId(0),
599 1,
600 1,
601 )];
602 assert!(matches!(
603 validate_mixed_work_protocol(&MixedWorkProtocolPlan::new(&bad_slab, 1)),
604 Err(MixedWorkProtocolError::ZeroOutputSlabId { sequence: 2 })
605 ));
606 }
607}