1use std::sync::{
6 Arc, Mutex,
7 atomic::{AtomicU64, Ordering},
8};
9
10use crate::{
11 domain::{
12 AgentError, AgentErrorKind, AgentId, AttemptId, ContentRef, DestinationRef, EffectId,
13 IdempotencyKey, MessageId, PolicyRef, PrivacyClass, RetentionClass, RetryClassification,
14 RunId, SourceKind, SourceRef, TurnId, ValidatedOutputId,
15 },
16 effect::EffectKind,
17 journal_ports::RunJournal,
18 output_delivery::{
19 OutputContentMode, OutputDeliveryDedupeRecord, OutputDeliveryId,
20 OutputDeliveryIntentRecord, OutputDeliveryJournalBase, OutputDeliveryKind,
21 OutputDeliveryPolicy, OutputDeliveryReceipt, OutputDeliveryReconciliationRecord,
22 OutputDeliveryRequest, OutputDeliveryRequirement, OutputDeliveryResultRecord,
23 OutputDispatchStatus, OutputSinkRef, ReplayRepairDecision, TerminalAppendStatus,
24 build_output_delivery_dedupe_key,
25 },
26 output_delivery_port::{OutputSinkCapabilities, OutputSinkRegistry},
27 package::RuntimePackageFingerprint,
28};
29
30#[derive(Clone)]
31pub struct OutputDeliveryService {
34 journal: Arc<dyn RunJournal>,
35 sinks: OutputSinkRegistry,
36 dedupe_index: OutputDeliveryDedupeIndex,
37 next_seq: Arc<AtomicU64>,
38}
39
40impl OutputDeliveryService {
41 pub fn new(journal: Arc<dyn RunJournal>, sinks: OutputSinkRegistry) -> Self {
45 Self {
46 journal,
47 sinks,
48 dedupe_index: OutputDeliveryDedupeIndex::default(),
49 next_seq: Arc::new(AtomicU64::new(0)),
50 }
51 }
52
53 pub fn with_dedupe_index(mut self, dedupe_index: OutputDeliveryDedupeIndex) -> Self {
57 self.dedupe_index = dedupe_index;
58 self
59 }
60
61 pub fn dispatch(
66 &self,
67 context: OutputDeliveryContext,
68 candidate: OutputDeliveryCandidate,
69 ) -> Result<OutputDeliveryOutcome, AgentError> {
70 if candidate.policy.requirement == OutputDeliveryRequirement::Disabled {
71 return Ok(OutputDeliveryOutcome::skipped(
72 OutputDispatchStatus::SkippedOptional,
73 "output delivery disabled by policy",
74 ));
75 }
76
77 let sink_ref = match self.resolve_sink_ref(&candidate) {
78 Ok(sink_ref) => sink_ref,
79 Err(error) => {
80 if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
81 return Ok(OutputDeliveryOutcome::skipped(
82 OutputDispatchStatus::SkippedOptional,
83 "optional output delivery skipped because no sink was selected",
84 ));
85 }
86 return Err(error);
87 }
88 };
89
90 let delivery_id = OutputDeliveryId::new(format!(
91 "output.delivery.{}.{}",
92 stable_fragment(context.run_id.as_str()),
93 candidate.delivery_kind.dedupe_fragment().replace(':', ".")
94 ));
95 let effect_id = EffectId::new(format!("effect.{}", delivery_id.as_str()));
96 let mut request = OutputDeliveryRequest {
97 delivery_id: delivery_id.clone(),
98 effect_id,
99 run_id: context.run_id.clone(),
100 agent_id: context.agent_id.clone(),
101 turn_id: context.turn_id.clone(),
102 attempt_id: context.attempt_id.clone(),
103 source_message_id: candidate.source_message_id.clone(),
104 validated_output_id: candidate.validated_output_id.clone(),
105 destination: candidate.destination.clone(),
106 sink_ref: sink_ref.clone(),
107 delivery_kind: candidate.delivery_kind.clone(),
108 content_mode: candidate
109 .requested_content_mode
110 .unwrap_or(candidate.policy.default_content_mode),
111 content_refs: candidate.content_refs.clone(),
112 redacted_summary: candidate.redacted_summary.clone(),
113 raw_content: candidate.raw_content.clone(),
114 privacy: candidate.privacy,
115 retention: candidate.retention,
116 policy_refs: candidate.policy.policy_refs(),
117 idempotency_key: Some(IdempotencyKey::new(format!(
118 "idempotency.{}",
119 delivery_id.as_str()
120 ))),
121 dedupe_key: crate::domain::DedupeKey::new("dedupe.output_delivery.pending"),
122 runtime_package_fingerprint: context.runtime_package_fingerprint.clone(),
123 };
124 request.dedupe_key = build_output_delivery_dedupe_key(&request);
125
126 if let Some(proof) = self.dedupe_index.completed(&request.dedupe_key)? {
127 let dedupe_record = OutputDeliveryDedupeRecord {
128 delivery_id,
129 dedupe_key: request.dedupe_key.clone(),
130 prior_delivery_id: Some(proof.delivery_id),
131 prior_external_operation_id: proof.external_operation_id,
132 prior_terminal_status: proof.status,
133 current_status: OutputDispatchStatus::Deduped,
134 redacted_summary: "output delivery skipped by completed dedupe proof".to_string(),
135 policy_refs: request.policy_refs.clone(),
136 };
137 self.journal
138 .append(dedupe_record.to_journal_record(
139 self.journal_base(
140 &context,
141 &request.destination,
142 format!("journal.{}.dedupe", request.delivery_id.as_str()),
143 ),
144 request.destination.clone(),
145 ))
146 .map_err(journal_failure)?;
147 return Ok(OutputDeliveryOutcome {
148 status: OutputDispatchStatus::Deduped,
149 request: Some(request.clone()),
150 intent_record: None,
151 result_record: None,
152 dedupe_record: Some(dedupe_record),
153 reconciliation_record: None,
154 receipt: None,
155 terminal_error: None,
156 });
157 }
158
159 let Some(sink) = self.sinks.get(&sink_ref) else {
160 if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
161 return Ok(OutputDeliveryOutcome::skipped(
162 OutputDispatchStatus::SkippedOptional,
163 "optional output delivery skipped because matching sink is missing",
164 ));
165 }
166 return self.append_host_configuration_needed(
167 context,
168 request,
169 "required output sink is missing",
170 );
171 };
172
173 let capabilities = sink.capabilities();
174 match resolve_content_mode(&candidate, &capabilities, &sink_ref) {
175 Ok(content_mode) => {
176 request.content_mode = content_mode;
177 if content_mode != OutputContentMode::RawContentIfPolicyAllows {
178 request.raw_content = None;
179 }
180 }
181 Err(error) => {
182 if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
183 return Ok(OutputDeliveryOutcome::skipped(
184 OutputDispatchStatus::SkippedOptional,
185 "optional output delivery skipped by sink capability or content policy",
186 ));
187 }
188 return self.append_host_configuration_needed(context, request, error);
189 }
190 }
191
192 if !capabilities.supports_kind(&request.delivery_kind) {
193 if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
194 return Ok(OutputDeliveryOutcome::skipped(
195 OutputDispatchStatus::SkippedOptional,
196 "optional output delivery skipped because sink cannot send this delivery kind",
197 ));
198 }
199 return self.append_host_configuration_needed(
200 context,
201 request,
202 "required output sink lacks delivery-kind capability",
203 );
204 }
205
206 let intent = OutputDeliveryIntentRecord::from_request(&request);
207 let intent_record_id = format!("journal.{}.intent", request.delivery_id.as_str());
208 let intent_journal = intent.to_journal_record(self.journal_base(
209 &context,
210 &request.destination,
211 intent_record_id.clone(),
212 ));
213 self.journal
214 .append(intent_journal)
215 .map_err(journal_failure)?;
216
217 let sink_result = if request.delivery_kind.is_chunk() {
218 sink.send_chunk(request.clone())
219 } else {
220 sink.send_final(request.clone())
221 };
222
223 match sink_result {
224 Ok(receipt) if receipt.status == OutputDispatchStatus::Completed => {
225 let result = OutputDeliveryResultRecord::completed(&request, &receipt);
226 let result_base = self.journal_base(
227 &context,
228 &request.destination,
229 format!("journal.{}.result", request.delivery_id.as_str()),
230 );
231 if let Err(error) = self
232 .journal
233 .append(result.to_journal_record(result_base.clone()))
234 {
235 return self.output_reconciliation_after_append_failure(
236 context,
237 request,
238 intent,
239 result,
240 Some(receipt),
241 intent_record_id,
242 result_base,
243 error,
244 );
245 }
246 self.dedupe_index.insert_completed(OutputDedupeProof {
247 dedupe_key: request.dedupe_key.clone(),
248 delivery_id: request.delivery_id.clone(),
249 external_operation_id: receipt.external_operation_id.clone(),
250 status: OutputDispatchStatus::Completed,
251 })?;
252 Ok(OutputDeliveryOutcome {
253 status: OutputDispatchStatus::Completed,
254 request: Some(request),
255 intent_record: Some(intent),
256 result_record: Some(result),
257 dedupe_record: None,
258 reconciliation_record: None,
259 receipt: Some(receipt),
260 terminal_error: None,
261 })
262 }
263 Ok(receipt) => {
264 let result = OutputDeliveryResultRecord::reconciliation_needed(&request, &receipt);
265 let result_base = self.journal_base(
266 &context,
267 &request.destination,
268 format!("journal.{}.result", request.delivery_id.as_str()),
269 );
270 if let Err(error) = self
271 .journal
272 .append(result.to_journal_record(result_base.clone()))
273 {
274 return self.output_reconciliation_after_append_failure(
275 context,
276 request,
277 intent,
278 result,
279 Some(receipt),
280 intent_record_id,
281 result_base,
282 error,
283 );
284 }
285 let reconciliation = OutputDeliveryReconciliationRecord {
286 delivery_id: request.delivery_id.clone(),
287 intent_record_id,
288 side_effect_kind: EffectKind::OutputDelivery,
289 idempotency_key: request.idempotency_key.clone(),
290 dedupe_key: request.dedupe_key.clone(),
291 external_operation_id: receipt.external_operation_id.clone(),
292 terminal_status: OutputDispatchStatus::ReconciliationNeeded,
293 terminal_append_status: TerminalAppendStatus::Appended,
294 reconciliation_adapter: Some(request.sink_ref.clone()),
295 unsafe_pending_reason: "sink returned unknown delivery outcome".to_string(),
296 replay_decision: ReplayRepairDecision::RequiresHostReconciliation,
297 resend_allowed: false,
298 };
299 Ok(OutputDeliveryOutcome {
300 status: OutputDispatchStatus::ReconciliationNeeded,
301 request: Some(request),
302 intent_record: Some(intent),
303 result_record: Some(result),
304 dedupe_record: None,
305 reconciliation_record: Some(reconciliation),
306 receipt: Some(receipt),
307 terminal_error: None,
308 })
309 }
310 Err(error) => {
311 let result = OutputDeliveryResultRecord::failed(
312 &request,
313 OutputDispatchStatus::Failed,
314 error.context().message,
315 error.retry(),
316 );
317 let result_base = self.journal_base(
318 &context,
319 &request.destination,
320 format!("journal.{}.result", request.delivery_id.as_str()),
321 );
322 if let Err(append_error) = self
323 .journal
324 .append(result.to_journal_record(result_base.clone()))
325 {
326 return self.output_reconciliation_after_append_failure(
327 context,
328 request,
329 intent,
330 result,
331 None,
332 intent_record_id,
333 result_base,
334 append_error,
335 );
336 }
337 Ok(OutputDeliveryOutcome {
338 status: OutputDispatchStatus::Failed,
339 request: Some(request),
340 intent_record: Some(intent),
341 result_record: Some(result),
342 dedupe_record: None,
343 reconciliation_record: None,
344 receipt: None,
345 terminal_error: Some(error),
346 })
347 }
348 }
349 }
350
351 pub fn repair_replay(
355 &self,
356 intent: &OutputDeliveryIntentRecord,
357 terminal_result: Option<&OutputDeliveryResultRecord>,
358 ) -> Result<OutputDeliveryReconciliationRecord, AgentError> {
359 if let Some(result) = terminal_result {
360 return Ok(OutputDeliveryReconciliationRecord {
361 delivery_id: intent.delivery_id.clone(),
362 intent_record_id: "journal.output_delivery.intent.replay".to_string(),
363 side_effect_kind: EffectKind::OutputDelivery,
364 idempotency_key: intent.idempotency_key.clone(),
365 dedupe_key: intent.dedupe_key.clone(),
366 external_operation_id: result.external_operation_id.clone(),
367 terminal_status: result.dispatch_status,
368 terminal_append_status: TerminalAppendStatus::Appended,
369 reconciliation_adapter: Some(intent.sink_ref.clone()),
370 unsafe_pending_reason: "terminal output delivery result already journaled"
371 .to_string(),
372 replay_decision: ReplayRepairDecision::CompletedByDedupeProof,
373 resend_allowed: false,
374 });
375 }
376
377 if let Some(proof) = self.dedupe_index.completed(&intent.dedupe_key)? {
378 return Ok(OutputDeliveryReconciliationRecord {
379 delivery_id: intent.delivery_id.clone(),
380 intent_record_id: "journal.output_delivery.intent.replay".to_string(),
381 side_effect_kind: EffectKind::OutputDelivery,
382 idempotency_key: intent.idempotency_key.clone(),
383 dedupe_key: intent.dedupe_key.clone(),
384 external_operation_id: proof.external_operation_id,
385 terminal_status: proof.status,
386 terminal_append_status: TerminalAppendStatus::NotAttempted,
387 reconciliation_adapter: Some(intent.sink_ref.clone()),
388 unsafe_pending_reason: "repair replay found completed dedupe proof".to_string(),
389 replay_decision: ReplayRepairDecision::CompletedByDedupeProof,
390 resend_allowed: false,
391 });
392 }
393
394 Ok(OutputDeliveryReconciliationRecord {
395 delivery_id: intent.delivery_id.clone(),
396 intent_record_id: "journal.output_delivery.intent.replay".to_string(),
397 side_effect_kind: EffectKind::OutputDelivery,
398 idempotency_key: intent.idempotency_key.clone(),
399 dedupe_key: intent.dedupe_key.clone(),
400 external_operation_id: None,
401 terminal_status: OutputDispatchStatus::ReconciliationNeeded,
402 terminal_append_status: TerminalAppendStatus::NotAttempted,
403 reconciliation_adapter: Some(intent.sink_ref.clone()),
404 unsafe_pending_reason:
405 "repair replay cannot resend output delivery without completed dedupe proof"
406 .to_string(),
407 replay_decision: ReplayRepairDecision::UnsafePending,
408 resend_allowed: false,
409 })
410 }
411
412 fn resolve_sink_ref(
413 &self,
414 candidate: &OutputDeliveryCandidate,
415 ) -> Result<OutputSinkRef, AgentError> {
416 if let Some(sink_ref) = &candidate.policy.required_sink_ref {
417 return Ok(sink_ref.clone());
418 }
419 if let Some(sink_ref) = &candidate.preferred_sink_ref {
420 return Ok(sink_ref.clone());
421 }
422 self.sinks
423 .first()
424 .map(|sink| sink.sink_ref())
425 .ok_or_else(|| {
426 AgentError::new(
427 AgentErrorKind::HostConfigurationNeeded,
428 RetryClassification::HostConfigurationNeeded,
429 "no output sink is registered for output delivery",
430 )
431 })
432 }
433
434 #[expect(
435 clippy::too_many_arguments,
436 reason = "append-failure reconciliation keeps request, intent, result, receipt, and error evidence explicit for durable recovery"
437 )]
438 fn output_reconciliation_after_append_failure(
439 &self,
440 _context: OutputDeliveryContext,
441 request: OutputDeliveryRequest,
442 intent: OutputDeliveryIntentRecord,
443 result: OutputDeliveryResultRecord,
444 receipt: Option<OutputDeliveryReceipt>,
445 intent_record_id: String,
446 mut result_base: OutputDeliveryJournalBase,
447 append_error: AgentError,
448 ) -> Result<OutputDeliveryOutcome, AgentError> {
449 let reconciliation = OutputDeliveryReconciliationRecord {
450 delivery_id: request.delivery_id.clone(),
451 intent_record_id,
452 side_effect_kind: EffectKind::OutputDelivery,
453 idempotency_key: request.idempotency_key.clone(),
454 dedupe_key: request.dedupe_key.clone(),
455 external_operation_id: receipt
456 .as_ref()
457 .and_then(|receipt| receipt.external_operation_id.clone()),
458 terminal_status: OutputDispatchStatus::ReconciliationNeeded,
459 terminal_append_status: TerminalAppendStatus::AppendFailed,
460 reconciliation_adapter: Some(request.sink_ref.clone()),
461 unsafe_pending_reason: format!(
462 "output sink was contacted but terminal result append failed: {}",
463 append_error.context().message
464 ),
465 replay_decision: ReplayRepairDecision::RequiresHostReconciliation,
466 resend_allowed: false,
467 };
468 result_base.record_id = format!("journal.{}.reconciliation", request.delivery_id.as_str());
469 let recovery_record =
470 reconciliation.to_journal_record(result_base, request.destination.clone());
471 self.journal
472 .append(recovery_record)
473 .map_err(|recovery_error| {
474 AgentError::new(
475 AgentErrorKind::RecoveryRepairNeeded,
476 RetryClassification::RepairNeeded,
477 format!(
478 "output delivery result append failed and reconciliation append failed: {}",
479 recovery_error.context().message
480 ),
481 )
482 .with_destination(request.destination.clone())
483 })?;
484 let destination = request.destination.clone();
485 Ok(OutputDeliveryOutcome {
486 status: OutputDispatchStatus::ReconciliationNeeded,
487 request: Some(request),
488 intent_record: Some(intent),
489 result_record: Some(result),
490 dedupe_record: None,
491 reconciliation_record: Some(reconciliation),
492 receipt,
493 terminal_error: Some(
494 AgentError::new(
495 AgentErrorKind::RecoveryRepairNeeded,
496 RetryClassification::RepairNeeded,
497 "output delivery terminal result append failed; replay requires reconciliation",
498 )
499 .with_destination(destination),
500 ),
501 })
502 }
503
504 fn append_host_configuration_needed(
505 &self,
506 context: OutputDeliveryContext,
507 request: OutputDeliveryRequest,
508 message: impl Into<String>,
509 ) -> Result<OutputDeliveryOutcome, AgentError> {
510 let message = message.into();
511 let intent = OutputDeliveryIntentRecord::from_request(&request);
512 self.journal
513 .append(intent.to_journal_record(self.journal_base(
514 &context,
515 &request.destination,
516 format!("journal.{}.intent", request.delivery_id.as_str()),
517 )))
518 .map_err(journal_failure)?;
519 let result = OutputDeliveryResultRecord::failed(
520 &request,
521 OutputDispatchStatus::HostConfigurationNeeded,
522 message.clone(),
523 RetryClassification::HostConfigurationNeeded,
524 );
525 self.journal
526 .append(result.to_journal_record(self.journal_base(
527 &context,
528 &request.destination,
529 format!("journal.{}.result", request.delivery_id.as_str()),
530 )))?;
531 let error = AgentError::new(
532 AgentErrorKind::HostConfigurationNeeded,
533 RetryClassification::HostConfigurationNeeded,
534 message,
535 )
536 .with_destination(request.destination.clone());
537 Ok(OutputDeliveryOutcome {
538 status: OutputDispatchStatus::HostConfigurationNeeded,
539 request: Some(request),
540 intent_record: Some(intent),
541 result_record: Some(result),
542 dedupe_record: None,
543 reconciliation_record: None,
544 receipt: None,
545 terminal_error: Some(error),
546 })
547 }
548
549 fn journal_base(
550 &self,
551 context: &OutputDeliveryContext,
552 destination: &DestinationRef,
553 record_id: String,
554 ) -> OutputDeliveryJournalBase {
555 OutputDeliveryJournalBase {
556 journal_seq: self.next_seq.fetch_add(1, Ordering::SeqCst) + 1,
557 record_id,
558 run_id: context.run_id.clone(),
559 agent_id: context.agent_id.clone(),
560 turn_id: context.turn_id.clone(),
561 attempt_id: context.attempt_id.clone(),
562 source: context.source.clone(),
563 destination: destination.clone(),
564 timestamp_millis: 0,
565 runtime_package_fingerprint: context.runtime_package_fingerprint.clone(),
566 redaction_policy_id: "policy.redaction.default".to_string(),
567 }
568 }
569}
570
571#[derive(Clone, Debug, Eq, PartialEq)]
572pub struct OutputDeliveryContext {
575 pub run_id: RunId,
577 pub agent_id: AgentId,
579 pub turn_id: Option<TurnId>,
581 pub attempt_id: Option<AttemptId>,
584 pub source: SourceRef,
587 pub runtime_package_fingerprint: RuntimePackageFingerprint,
591}
592
593impl OutputDeliveryContext {
594 pub fn new(
598 run_id: RunId,
599 agent_id: AgentId,
600 runtime_package_fingerprint: RuntimePackageFingerprint,
601 ) -> Self {
602 Self {
603 run_id,
604 agent_id,
605 turn_id: None,
606 attempt_id: None,
607 source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.output_delivery"),
608 runtime_package_fingerprint,
609 }
610 }
611}
612
613#[derive(Clone, Debug, Eq, PartialEq)]
614pub struct OutputDeliveryCandidate {
617 pub destination: DestinationRef,
620 pub preferred_sink_ref: Option<OutputSinkRef>,
623 pub delivery_kind: OutputDeliveryKind,
626 pub source_message_id: Option<MessageId>,
628 pub validated_output_id: Option<ValidatedOutputId>,
630 pub content_refs: Vec<ContentRef>,
633 pub redacted_summary: String,
635 pub raw_content: Option<String>,
638 pub requested_content_mode: Option<OutputContentMode>,
641 pub privacy: PrivacyClass,
644 pub retention: RetentionClass,
647 pub policy: OutputDeliveryPolicy,
649}
650
651impl OutputDeliveryCandidate {
652 pub fn final_message(
656 destination: DestinationRef,
657 sink_ref: OutputSinkRef,
658 content_ref: ContentRef,
659 policy_ref: PolicyRef,
660 ) -> Self {
661 Self {
662 destination,
663 preferred_sink_ref: Some(sink_ref.clone()),
664 delivery_kind: OutputDeliveryKind::FinalMessage,
665 source_message_id: Some(MessageId::new("message.output_delivery.final")),
666 validated_output_id: None,
667 content_refs: vec![content_ref],
668 redacted_summary: "final assistant message ready for output delivery".to_string(),
669 raw_content: None,
670 requested_content_mode: None,
671 privacy: PrivacyClass::ContentRefsOnly,
672 retention: RetentionClass::RunScoped,
673 policy: OutputDeliveryPolicy::required(policy_ref, sink_ref),
674 }
675 }
676}
677
678#[derive(Clone, Debug)]
679pub struct OutputDeliveryOutcome {
682 pub status: OutputDispatchStatus,
684 pub request: Option<OutputDeliveryRequest>,
686 pub intent_record: Option<OutputDeliveryIntentRecord>,
689 pub result_record: Option<OutputDeliveryResultRecord>,
692 pub dedupe_record: Option<OutputDeliveryDedupeRecord>,
695 pub reconciliation_record: Option<OutputDeliveryReconciliationRecord>,
698 pub receipt: Option<OutputDeliveryReceipt>,
701 pub terminal_error: Option<AgentError>,
704}
705
706impl OutputDeliveryOutcome {
707 fn skipped(status: OutputDispatchStatus, summary: impl Into<String>) -> Self {
708 let _ = summary.into();
709 Self {
710 status,
711 request: None,
712 intent_record: None,
713 result_record: None,
714 dedupe_record: None,
715 reconciliation_record: None,
716 receipt: None,
717 terminal_error: None,
718 }
719 }
720}
721
722#[derive(Clone, Debug, Default)]
723pub struct OutputDeliveryDedupeIndex {
726 completed: Arc<Mutex<std::collections::BTreeMap<crate::domain::DedupeKey, OutputDedupeProof>>>,
727}
728
729impl OutputDeliveryDedupeIndex {
730 pub fn insert_completed(&self, proof: OutputDedupeProof) -> Result<(), AgentError> {
734 self.completed
735 .lock()
736 .map_err(|_| AgentError::contract_violation("output dedupe index lock poisoned"))?
737 .insert(proof.dedupe_key.clone(), proof);
738 Ok(())
739 }
740
741 pub fn completed(
745 &self,
746 dedupe_key: &crate::domain::DedupeKey,
747 ) -> Result<Option<OutputDedupeProof>, AgentError> {
748 Ok(self
749 .completed
750 .lock()
751 .map_err(|_| AgentError::contract_violation("output dedupe index lock poisoned"))?
752 .get(dedupe_key)
753 .cloned())
754 }
755}
756
757#[derive(Clone, Debug, Eq, PartialEq)]
758pub struct OutputDedupeProof {
761 pub dedupe_key: crate::domain::DedupeKey,
764 pub delivery_id: OutputDeliveryId,
766 pub external_operation_id: Option<String>,
769 pub status: OutputDispatchStatus,
771}
772
773fn resolve_content_mode(
774 candidate: &OutputDeliveryCandidate,
775 capabilities: &OutputSinkCapabilities,
776 sink_ref: &OutputSinkRef,
777) -> Result<OutputContentMode, String> {
778 let requested = candidate
779 .requested_content_mode
780 .unwrap_or(candidate.policy.default_content_mode);
781 let fallback_modes = [
782 requested,
783 OutputContentMode::RedactedSummary,
784 OutputContentMode::ContentRefsOnly,
785 ];
786
787 for mode in fallback_modes {
788 if !candidate.policy.allows_mode(mode) || !capabilities.supports_content_mode(mode) {
789 continue;
790 }
791 if mode == OutputContentMode::RawContentIfPolicyAllows {
792 let Some(raw_content) = candidate.raw_content.as_ref() else {
793 continue;
794 };
795 if !candidate
796 .policy
797 .raw_content_policy
798 .allows_raw_for(sink_ref, raw_content.len())
799 {
800 continue;
801 }
802 }
803 return Ok(mode);
804 }
805
806 Err(
807 "output sink lacks required content-mode capability or policy denied raw content"
808 .to_string(),
809 )
810}
811
812fn journal_failure(error: AgentError) -> AgentError {
813 AgentError::new(
814 AgentErrorKind::JournalFailure,
815 RetryClassification::RepairNeeded,
816 error.context().message,
817 )
818}
819
820fn stable_fragment(value: &str) -> String {
821 use sha2::{Digest, Sha256};
822
823 let digest = Sha256::digest(value.as_bytes());
824 digest[..6]
825 .iter()
826 .map(|byte| format!("{byte:02x}"))
827 .collect::<String>()
828}