Skip to main content

agent_sdk_core/application/
output_delivery.rs

1//! Output delivery coordination over destination refs and sinks. Use this module to
2//! send final or streaming output through host-provided sinks with dedupe and journal
3//! evidence. Dispatch may call sinks and append delivery intent/result records.
4//!
5use std::sync::{
6    Arc, Mutex,
7    atomic::{AtomicU64, Ordering},
8};
9
10use crate::{
11    domain::{
12        AgentError, AgentErrorKind, AgentId, AttemptId, ContentRef, DestinationRef, EffectId,
13        IdempotencyKey, MessageId, PolicyRef, PrivacyClass, RetentionClass, RetryClassification,
14        RunId, SourceKind, SourceRef, TurnId, ValidatedOutputId,
15    },
16    effect::EffectKind,
17    journal_ports::RunJournal,
18    output_delivery::{
19        OutputContentMode, OutputDeliveryDedupeRecord, OutputDeliveryId,
20        OutputDeliveryIntentRecord, OutputDeliveryJournalBase, OutputDeliveryKind,
21        OutputDeliveryPolicy, OutputDeliveryReceipt, OutputDeliveryReconciliationRecord,
22        OutputDeliveryRequest, OutputDeliveryRequirement, OutputDeliveryResultRecord,
23        OutputDispatchStatus, OutputSinkRef, ReplayRepairDecision, TerminalAppendStatus,
24        build_output_delivery_dedupe_key,
25    },
26    output_delivery_port::{OutputSinkCapabilities, OutputSinkRegistry},
27    package::RuntimePackageFingerprint,
28};
29
30#[derive(Clone)]
31/// Holds output delivery service application-layer state or configuration.
32/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
33pub struct OutputDeliveryService {
34    journal: Arc<dyn RunJournal>,
35    sinks: OutputSinkRegistry,
36    dedupe_index: OutputDeliveryDedupeIndex,
37    next_seq: Arc<AtomicU64>,
38}
39
40impl OutputDeliveryService {
41    /// Creates a new application::output_delivery value with explicit
42    /// caller-provided inputs. This constructor is data-only and
43    /// performs no I/O or external side effects.
44    pub fn new(journal: Arc<dyn RunJournal>, sinks: OutputSinkRegistry) -> Self {
45        Self {
46            journal,
47            sinks,
48            dedupe_index: OutputDeliveryDedupeIndex::default(),
49            next_seq: Arc::new(AtomicU64::new(0)),
50        }
51    }
52
53    /// Returns this value with its dedupe index setting replaced. The
54    /// method follows builder-style data construction and does not
55    /// execute external work.
56    pub fn with_dedupe_index(mut self, dedupe_index: OutputDeliveryDedupeIndex) -> Self {
57        self.dedupe_index = dedupe_index;
58        self
59    }
60
61    /// Coordinates dispatch for the application::output_delivery contract.
62    /// This may call configured ports and update runtime/journal/event state
63    /// according to the surrounding module, without introducing a parallel
64    /// behavior path.
65    pub fn dispatch(
66        &self,
67        context: OutputDeliveryContext,
68        candidate: OutputDeliveryCandidate,
69    ) -> Result<OutputDeliveryOutcome, AgentError> {
70        if candidate.policy.requirement == OutputDeliveryRequirement::Disabled {
71            return Ok(OutputDeliveryOutcome::skipped(
72                OutputDispatchStatus::SkippedOptional,
73                "output delivery disabled by policy",
74            ));
75        }
76
77        let sink_ref = match self.resolve_sink_ref(&candidate) {
78            Ok(sink_ref) => sink_ref,
79            Err(error) => {
80                if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
81                    return Ok(OutputDeliveryOutcome::skipped(
82                        OutputDispatchStatus::SkippedOptional,
83                        "optional output delivery skipped because no sink was selected",
84                    ));
85                }
86                return Err(error);
87            }
88        };
89
90        let delivery_id = OutputDeliveryId::new(format!(
91            "output.delivery.{}.{}",
92            stable_fragment(context.run_id.as_str()),
93            candidate.delivery_kind.dedupe_fragment().replace(':', ".")
94        ));
95        let effect_id = EffectId::new(format!("effect.{}", delivery_id.as_str()));
96        let mut request = OutputDeliveryRequest {
97            delivery_id: delivery_id.clone(),
98            effect_id,
99            run_id: context.run_id.clone(),
100            agent_id: context.agent_id.clone(),
101            turn_id: context.turn_id.clone(),
102            attempt_id: context.attempt_id.clone(),
103            source_message_id: candidate.source_message_id.clone(),
104            validated_output_id: candidate.validated_output_id.clone(),
105            destination: candidate.destination.clone(),
106            sink_ref: sink_ref.clone(),
107            delivery_kind: candidate.delivery_kind.clone(),
108            content_mode: candidate
109                .requested_content_mode
110                .unwrap_or(candidate.policy.default_content_mode),
111            content_refs: candidate.content_refs.clone(),
112            redacted_summary: candidate.redacted_summary.clone(),
113            raw_content: candidate.raw_content.clone(),
114            privacy: candidate.privacy,
115            retention: candidate.retention,
116            policy_refs: candidate.policy.policy_refs(),
117            idempotency_key: Some(IdempotencyKey::new(format!(
118                "idempotency.{}",
119                delivery_id.as_str()
120            ))),
121            dedupe_key: crate::domain::DedupeKey::new("dedupe.output_delivery.pending"),
122            runtime_package_fingerprint: context.runtime_package_fingerprint.clone(),
123        };
124        request.dedupe_key = build_output_delivery_dedupe_key(&request);
125
126        if let Some(proof) = self.dedupe_index.completed(&request.dedupe_key)? {
127            let dedupe_record = OutputDeliveryDedupeRecord {
128                delivery_id,
129                dedupe_key: request.dedupe_key.clone(),
130                prior_delivery_id: Some(proof.delivery_id),
131                prior_external_operation_id: proof.external_operation_id,
132                prior_terminal_status: proof.status,
133                current_status: OutputDispatchStatus::Deduped,
134                redacted_summary: "output delivery skipped by completed dedupe proof".to_string(),
135                policy_refs: request.policy_refs.clone(),
136            };
137            self.journal
138                .append(dedupe_record.to_journal_record(
139                    self.journal_base(
140                        &context,
141                        &request.destination,
142                        format!("journal.{}.dedupe", request.delivery_id.as_str()),
143                    ),
144                    request.destination.clone(),
145                ))
146                .map_err(journal_failure)?;
147            return Ok(OutputDeliveryOutcome {
148                status: OutputDispatchStatus::Deduped,
149                request: Some(request.clone()),
150                intent_record: None,
151                result_record: None,
152                dedupe_record: Some(dedupe_record),
153                reconciliation_record: None,
154                receipt: None,
155                terminal_error: None,
156            });
157        }
158
159        let Some(sink) = self.sinks.get(&sink_ref) else {
160            if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
161                return Ok(OutputDeliveryOutcome::skipped(
162                    OutputDispatchStatus::SkippedOptional,
163                    "optional output delivery skipped because matching sink is missing",
164                ));
165            }
166            return self.append_host_configuration_needed(
167                context,
168                request,
169                "required output sink is missing",
170            );
171        };
172
173        let capabilities = sink.capabilities();
174        match resolve_content_mode(&candidate, &capabilities, &sink_ref) {
175            Ok(content_mode) => {
176                request.content_mode = content_mode;
177                if content_mode != OutputContentMode::RawContentIfPolicyAllows {
178                    request.raw_content = None;
179                }
180            }
181            Err(error) => {
182                if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
183                    return Ok(OutputDeliveryOutcome::skipped(
184                        OutputDispatchStatus::SkippedOptional,
185                        "optional output delivery skipped by sink capability or content policy",
186                    ));
187                }
188                return self.append_host_configuration_needed(context, request, error);
189            }
190        }
191
192        if !capabilities.supports_kind(&request.delivery_kind) {
193            if candidate.policy.requirement == OutputDeliveryRequirement::Optional {
194                return Ok(OutputDeliveryOutcome::skipped(
195                    OutputDispatchStatus::SkippedOptional,
196                    "optional output delivery skipped because sink cannot send this delivery kind",
197                ));
198            }
199            return self.append_host_configuration_needed(
200                context,
201                request,
202                "required output sink lacks delivery-kind capability",
203            );
204        }
205
206        let intent = OutputDeliveryIntentRecord::from_request(&request);
207        let intent_record_id = format!("journal.{}.intent", request.delivery_id.as_str());
208        let intent_journal = intent.to_journal_record(self.journal_base(
209            &context,
210            &request.destination,
211            intent_record_id.clone(),
212        ));
213        self.journal
214            .append(intent_journal)
215            .map_err(journal_failure)?;
216
217        let sink_result = if request.delivery_kind.is_chunk() {
218            sink.send_chunk(request.clone())
219        } else {
220            sink.send_final(request.clone())
221        };
222
223        match sink_result {
224            Ok(receipt) if receipt.status == OutputDispatchStatus::Completed => {
225                let result = OutputDeliveryResultRecord::completed(&request, &receipt);
226                let result_base = self.journal_base(
227                    &context,
228                    &request.destination,
229                    format!("journal.{}.result", request.delivery_id.as_str()),
230                );
231                if let Err(error) = self
232                    .journal
233                    .append(result.to_journal_record(result_base.clone()))
234                {
235                    return self.output_reconciliation_after_append_failure(
236                        context,
237                        request,
238                        intent,
239                        result,
240                        Some(receipt),
241                        intent_record_id,
242                        result_base,
243                        error,
244                    );
245                }
246                self.dedupe_index.insert_completed(OutputDedupeProof {
247                    dedupe_key: request.dedupe_key.clone(),
248                    delivery_id: request.delivery_id.clone(),
249                    external_operation_id: receipt.external_operation_id.clone(),
250                    status: OutputDispatchStatus::Completed,
251                })?;
252                Ok(OutputDeliveryOutcome {
253                    status: OutputDispatchStatus::Completed,
254                    request: Some(request),
255                    intent_record: Some(intent),
256                    result_record: Some(result),
257                    dedupe_record: None,
258                    reconciliation_record: None,
259                    receipt: Some(receipt),
260                    terminal_error: None,
261                })
262            }
263            Ok(receipt) => {
264                let result = OutputDeliveryResultRecord::reconciliation_needed(&request, &receipt);
265                let result_base = self.journal_base(
266                    &context,
267                    &request.destination,
268                    format!("journal.{}.result", request.delivery_id.as_str()),
269                );
270                if let Err(error) = self
271                    .journal
272                    .append(result.to_journal_record(result_base.clone()))
273                {
274                    return self.output_reconciliation_after_append_failure(
275                        context,
276                        request,
277                        intent,
278                        result,
279                        Some(receipt),
280                        intent_record_id,
281                        result_base,
282                        error,
283                    );
284                }
285                let reconciliation = OutputDeliveryReconciliationRecord {
286                    delivery_id: request.delivery_id.clone(),
287                    intent_record_id,
288                    side_effect_kind: EffectKind::OutputDelivery,
289                    idempotency_key: request.idempotency_key.clone(),
290                    dedupe_key: request.dedupe_key.clone(),
291                    external_operation_id: receipt.external_operation_id.clone(),
292                    terminal_status: OutputDispatchStatus::ReconciliationNeeded,
293                    terminal_append_status: TerminalAppendStatus::Appended,
294                    reconciliation_adapter: Some(request.sink_ref.clone()),
295                    unsafe_pending_reason: "sink returned unknown delivery outcome".to_string(),
296                    replay_decision: ReplayRepairDecision::RequiresHostReconciliation,
297                    resend_allowed: false,
298                };
299                Ok(OutputDeliveryOutcome {
300                    status: OutputDispatchStatus::ReconciliationNeeded,
301                    request: Some(request),
302                    intent_record: Some(intent),
303                    result_record: Some(result),
304                    dedupe_record: None,
305                    reconciliation_record: Some(reconciliation),
306                    receipt: Some(receipt),
307                    terminal_error: None,
308                })
309            }
310            Err(error) => {
311                let result = OutputDeliveryResultRecord::failed(
312                    &request,
313                    OutputDispatchStatus::Failed,
314                    error.context().message,
315                    error.retry(),
316                );
317                let result_base = self.journal_base(
318                    &context,
319                    &request.destination,
320                    format!("journal.{}.result", request.delivery_id.as_str()),
321                );
322                if let Err(append_error) = self
323                    .journal
324                    .append(result.to_journal_record(result_base.clone()))
325                {
326                    return self.output_reconciliation_after_append_failure(
327                        context,
328                        request,
329                        intent,
330                        result,
331                        None,
332                        intent_record_id,
333                        result_base,
334                        append_error,
335                    );
336                }
337                Ok(OutputDeliveryOutcome {
338                    status: OutputDispatchStatus::Failed,
339                    request: Some(request),
340                    intent_record: Some(intent),
341                    result_record: Some(result),
342                    dedupe_record: None,
343                    reconciliation_record: None,
344                    receipt: None,
345                    terminal_error: Some(error),
346                })
347            }
348        }
349    }
350
351    /// Operates on in-memory or journal-derived application::output_delivery
352    /// state for diagnostics and repair evidence. It does not create a second
353    /// run loop or product workflow owner.
354    pub fn repair_replay(
355        &self,
356        intent: &OutputDeliveryIntentRecord,
357        terminal_result: Option<&OutputDeliveryResultRecord>,
358    ) -> Result<OutputDeliveryReconciliationRecord, AgentError> {
359        if let Some(result) = terminal_result {
360            return Ok(OutputDeliveryReconciliationRecord {
361                delivery_id: intent.delivery_id.clone(),
362                intent_record_id: "journal.output_delivery.intent.replay".to_string(),
363                side_effect_kind: EffectKind::OutputDelivery,
364                idempotency_key: intent.idempotency_key.clone(),
365                dedupe_key: intent.dedupe_key.clone(),
366                external_operation_id: result.external_operation_id.clone(),
367                terminal_status: result.dispatch_status,
368                terminal_append_status: TerminalAppendStatus::Appended,
369                reconciliation_adapter: Some(intent.sink_ref.clone()),
370                unsafe_pending_reason: "terminal output delivery result already journaled"
371                    .to_string(),
372                replay_decision: ReplayRepairDecision::CompletedByDedupeProof,
373                resend_allowed: false,
374            });
375        }
376
377        if let Some(proof) = self.dedupe_index.completed(&intent.dedupe_key)? {
378            return Ok(OutputDeliveryReconciliationRecord {
379                delivery_id: intent.delivery_id.clone(),
380                intent_record_id: "journal.output_delivery.intent.replay".to_string(),
381                side_effect_kind: EffectKind::OutputDelivery,
382                idempotency_key: intent.idempotency_key.clone(),
383                dedupe_key: intent.dedupe_key.clone(),
384                external_operation_id: proof.external_operation_id,
385                terminal_status: proof.status,
386                terminal_append_status: TerminalAppendStatus::NotAttempted,
387                reconciliation_adapter: Some(intent.sink_ref.clone()),
388                unsafe_pending_reason: "repair replay found completed dedupe proof".to_string(),
389                replay_decision: ReplayRepairDecision::CompletedByDedupeProof,
390                resend_allowed: false,
391            });
392        }
393
394        Ok(OutputDeliveryReconciliationRecord {
395            delivery_id: intent.delivery_id.clone(),
396            intent_record_id: "journal.output_delivery.intent.replay".to_string(),
397            side_effect_kind: EffectKind::OutputDelivery,
398            idempotency_key: intent.idempotency_key.clone(),
399            dedupe_key: intent.dedupe_key.clone(),
400            external_operation_id: None,
401            terminal_status: OutputDispatchStatus::ReconciliationNeeded,
402            terminal_append_status: TerminalAppendStatus::NotAttempted,
403            reconciliation_adapter: Some(intent.sink_ref.clone()),
404            unsafe_pending_reason:
405                "repair replay cannot resend output delivery without completed dedupe proof"
406                    .to_string(),
407            replay_decision: ReplayRepairDecision::UnsafePending,
408            resend_allowed: false,
409        })
410    }
411
412    fn resolve_sink_ref(
413        &self,
414        candidate: &OutputDeliveryCandidate,
415    ) -> Result<OutputSinkRef, AgentError> {
416        if let Some(sink_ref) = &candidate.policy.required_sink_ref {
417            return Ok(sink_ref.clone());
418        }
419        if let Some(sink_ref) = &candidate.preferred_sink_ref {
420            return Ok(sink_ref.clone());
421        }
422        self.sinks
423            .first()
424            .map(|sink| sink.sink_ref())
425            .ok_or_else(|| {
426                AgentError::new(
427                    AgentErrorKind::HostConfigurationNeeded,
428                    RetryClassification::HostConfigurationNeeded,
429                    "no output sink is registered for output delivery",
430                )
431            })
432    }
433
434    #[expect(
435        clippy::too_many_arguments,
436        reason = "append-failure reconciliation keeps request, intent, result, receipt, and error evidence explicit for durable recovery"
437    )]
438    fn output_reconciliation_after_append_failure(
439        &self,
440        _context: OutputDeliveryContext,
441        request: OutputDeliveryRequest,
442        intent: OutputDeliveryIntentRecord,
443        result: OutputDeliveryResultRecord,
444        receipt: Option<OutputDeliveryReceipt>,
445        intent_record_id: String,
446        mut result_base: OutputDeliveryJournalBase,
447        append_error: AgentError,
448    ) -> Result<OutputDeliveryOutcome, AgentError> {
449        let reconciliation = OutputDeliveryReconciliationRecord {
450            delivery_id: request.delivery_id.clone(),
451            intent_record_id,
452            side_effect_kind: EffectKind::OutputDelivery,
453            idempotency_key: request.idempotency_key.clone(),
454            dedupe_key: request.dedupe_key.clone(),
455            external_operation_id: receipt
456                .as_ref()
457                .and_then(|receipt| receipt.external_operation_id.clone()),
458            terminal_status: OutputDispatchStatus::ReconciliationNeeded,
459            terminal_append_status: TerminalAppendStatus::AppendFailed,
460            reconciliation_adapter: Some(request.sink_ref.clone()),
461            unsafe_pending_reason: format!(
462                "output sink was contacted but terminal result append failed: {}",
463                append_error.context().message
464            ),
465            replay_decision: ReplayRepairDecision::RequiresHostReconciliation,
466            resend_allowed: false,
467        };
468        result_base.record_id = format!("journal.{}.reconciliation", request.delivery_id.as_str());
469        let recovery_record =
470            reconciliation.to_journal_record(result_base, request.destination.clone());
471        self.journal
472            .append(recovery_record)
473            .map_err(|recovery_error| {
474                AgentError::new(
475                    AgentErrorKind::RecoveryRepairNeeded,
476                    RetryClassification::RepairNeeded,
477                    format!(
478                        "output delivery result append failed and reconciliation append failed: {}",
479                        recovery_error.context().message
480                    ),
481                )
482                .with_destination(request.destination.clone())
483            })?;
484        let destination = request.destination.clone();
485        Ok(OutputDeliveryOutcome {
486            status: OutputDispatchStatus::ReconciliationNeeded,
487            request: Some(request),
488            intent_record: Some(intent),
489            result_record: Some(result),
490            dedupe_record: None,
491            reconciliation_record: Some(reconciliation),
492            receipt,
493            terminal_error: Some(
494                AgentError::new(
495                    AgentErrorKind::RecoveryRepairNeeded,
496                    RetryClassification::RepairNeeded,
497                    "output delivery terminal result append failed; replay requires reconciliation",
498                )
499                .with_destination(destination),
500            ),
501        })
502    }
503
504    fn append_host_configuration_needed(
505        &self,
506        context: OutputDeliveryContext,
507        request: OutputDeliveryRequest,
508        message: impl Into<String>,
509    ) -> Result<OutputDeliveryOutcome, AgentError> {
510        let message = message.into();
511        let intent = OutputDeliveryIntentRecord::from_request(&request);
512        self.journal
513            .append(intent.to_journal_record(self.journal_base(
514                &context,
515                &request.destination,
516                format!("journal.{}.intent", request.delivery_id.as_str()),
517            )))
518            .map_err(journal_failure)?;
519        let result = OutputDeliveryResultRecord::failed(
520            &request,
521            OutputDispatchStatus::HostConfigurationNeeded,
522            message.clone(),
523            RetryClassification::HostConfigurationNeeded,
524        );
525        self.journal
526            .append(result.to_journal_record(self.journal_base(
527                &context,
528                &request.destination,
529                format!("journal.{}.result", request.delivery_id.as_str()),
530            )))?;
531        let error = AgentError::new(
532            AgentErrorKind::HostConfigurationNeeded,
533            RetryClassification::HostConfigurationNeeded,
534            message,
535        )
536        .with_destination(request.destination.clone());
537        Ok(OutputDeliveryOutcome {
538            status: OutputDispatchStatus::HostConfigurationNeeded,
539            request: Some(request),
540            intent_record: Some(intent),
541            result_record: Some(result),
542            dedupe_record: None,
543            reconciliation_record: None,
544            receipt: None,
545            terminal_error: Some(error),
546        })
547    }
548
549    fn journal_base(
550        &self,
551        context: &OutputDeliveryContext,
552        destination: &DestinationRef,
553        record_id: String,
554    ) -> OutputDeliveryJournalBase {
555        OutputDeliveryJournalBase {
556            journal_seq: self.next_seq.fetch_add(1, Ordering::SeqCst) + 1,
557            record_id,
558            run_id: context.run_id.clone(),
559            agent_id: context.agent_id.clone(),
560            turn_id: context.turn_id.clone(),
561            attempt_id: context.attempt_id.clone(),
562            source: context.source.clone(),
563            destination: destination.clone(),
564            timestamp_millis: 0,
565            runtime_package_fingerprint: context.runtime_package_fingerprint.clone(),
566            redaction_policy_id: "policy.redaction.default".to_string(),
567        }
568    }
569}
570
571#[derive(Clone, Debug, Eq, PartialEq)]
572/// Holds output delivery context application-layer state or configuration.
573/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
574pub struct OutputDeliveryContext {
575    /// Run identifier used for lineage, filtering, replay, and dedupe.
576    pub run_id: RunId,
577    /// Agent identifier used for lineage, filtering, and ownership checks.
578    pub agent_id: AgentId,
579    /// Turn identifier for one loop turn within a run.
580    pub turn_id: Option<TurnId>,
581    /// Attempt identifier for retry, repair, provider, or tool execution
582    /// evidence.
583    pub attempt_id: Option<AttemptId>,
584    /// Source label or ref for this item; it is metadata and does not fetch
585    /// content by itself.
586    pub source: SourceRef,
587    /// Fingerprint of the runtime package snapshot in force when this value was produced.
588    /// Use it for replay, dedupe, and package-lineage checks; the field is evidence and does
589    /// not execute package behavior.
590    pub runtime_package_fingerprint: RuntimePackageFingerprint,
591}
592
593impl OutputDeliveryContext {
594    /// Creates a new application::output_delivery value with explicit
595    /// caller-provided inputs. This constructor is data-only and
596    /// performs no I/O or external side effects.
597    pub fn new(
598        run_id: RunId,
599        agent_id: AgentId,
600        runtime_package_fingerprint: RuntimePackageFingerprint,
601    ) -> Self {
602        Self {
603            run_id,
604            agent_id,
605            turn_id: None,
606            attempt_id: None,
607            source: SourceRef::with_kind(SourceKind::Sdk, "source.sdk.output_delivery"),
608            runtime_package_fingerprint,
609        }
610    }
611}
612
613#[derive(Clone, Debug, Eq, PartialEq)]
614/// Holds output delivery candidate application-layer state or configuration.
615/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
616pub struct OutputDeliveryCandidate {
617    /// Destination label or ref for this item; it is metadata and does not
618    /// deliver content by itself.
619    pub destination: DestinationRef,
620    /// Typed preferred sink ref reference. Resolving or executing it is a
621    /// separate policy-gated step.
622    pub preferred_sink_ref: Option<OutputSinkRef>,
623    /// Output delivery setting or policy.
624    /// Delivery coordinators use it to decide sink mode, dedupe, and required evidence.
625    pub delivery_kind: OutputDeliveryKind,
626    /// Stable source message id used for typed lineage, lookup, or dedupe.
627    pub source_message_id: Option<MessageId>,
628    /// Stable validated output id used for typed lineage, lookup, or dedupe.
629    pub validated_output_id: Option<ValidatedOutputId>,
630    /// Content references associated with this record; resolving them is a
631    /// separate policy-gated step.
632    pub content_refs: Vec<ContentRef>,
633    /// Redacted human-readable summary safe for events, telemetry, and logs.
634    pub redacted_summary: String,
635    /// Raw content or raw-content control for this value.
636    /// Use it only when policy explicitly allows raw content capture or delivery.
637    pub raw_content: Option<String>,
638    /// Optional requested content mode value.
639    /// When absent, callers should use the documented default or skip that optional behavior.
640    pub requested_content_mode: Option<OutputContentMode>,
641    /// Privacy class used for projection, telemetry, and raw-content access
642    /// decisions.
643    pub privacy: PrivacyClass,
644    /// Retention class used by hosts and sinks when storing or exporting this
645    /// item.
646    pub retention: RetentionClass,
647    /// Policy used by this record or request.
648    pub policy: OutputDeliveryPolicy,
649}
650
651impl OutputDeliveryCandidate {
652    /// Builds the final message value.
653    /// This is data construction and performs no I/O, journal append, event publication, or
654    /// process work.
655    pub fn final_message(
656        destination: DestinationRef,
657        sink_ref: OutputSinkRef,
658        content_ref: ContentRef,
659        policy_ref: PolicyRef,
660    ) -> Self {
661        Self {
662            destination,
663            preferred_sink_ref: Some(sink_ref.clone()),
664            delivery_kind: OutputDeliveryKind::FinalMessage,
665            source_message_id: Some(MessageId::new("message.output_delivery.final")),
666            validated_output_id: None,
667            content_refs: vec![content_ref],
668            redacted_summary: "final assistant message ready for output delivery".to_string(),
669            raw_content: None,
670            requested_content_mode: None,
671            privacy: PrivacyClass::ContentRefsOnly,
672            retention: RetentionClass::RunScoped,
673            policy: OutputDeliveryPolicy::required(policy_ref, sink_ref),
674        }
675    }
676}
677
678#[derive(Clone, Debug)]
679/// Holds output delivery outcome application-layer state or configuration.
680/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
681pub struct OutputDeliveryOutcome {
682    /// Finite status for this record or lifecycle stage.
683    pub status: OutputDispatchStatus,
684    /// Request DTO or resolved call that triggered this operation.
685    pub request: Option<OutputDeliveryRequest>,
686    /// Optional intent record value.
687    /// When absent, callers should use the documented default or skip that optional behavior.
688    pub intent_record: Option<OutputDeliveryIntentRecord>,
689    /// Optional result record value.
690    /// When absent, callers should use the documented default or skip that optional behavior.
691    pub result_record: Option<OutputDeliveryResultRecord>,
692    /// Dedupe policy or key for a side-effecting operation.
693    /// Replay and repair use it to avoid sending or executing the same effect twice.
694    pub dedupe_record: Option<OutputDeliveryDedupeRecord>,
695    /// Optional reconciliation record value.
696    /// When absent, callers should use the documented default or skip that optional behavior.
697    pub reconciliation_record: Option<OutputDeliveryReconciliationRecord>,
698    /// Optional receipt value.
699    /// When absent, callers should use the documented default or skip that optional behavior.
700    pub receipt: Option<OutputDeliveryReceipt>,
701    /// Optional terminal error value.
702    /// When absent, callers should use the documented default or skip that optional behavior.
703    pub terminal_error: Option<AgentError>,
704}
705
706impl OutputDeliveryOutcome {
707    fn skipped(status: OutputDispatchStatus, summary: impl Into<String>) -> Self {
708        let _ = summary.into();
709        Self {
710            status,
711            request: None,
712            intent_record: None,
713            result_record: None,
714            dedupe_record: None,
715            reconciliation_record: None,
716            receipt: None,
717            terminal_error: None,
718        }
719    }
720}
721
722#[derive(Clone, Debug, Default)]
723/// Holds output delivery dedupe index application-layer state or configuration.
724/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
725pub struct OutputDeliveryDedupeIndex {
726    completed: Arc<Mutex<std::collections::BTreeMap<crate::domain::DedupeKey, OutputDedupeProof>>>,
727}
728
729impl OutputDeliveryDedupeIndex {
730    /// Records a completed output-delivery dedupe proof in the in-memory index.
731    /// This mutates only the local dedupe map and does not send output, append journals, or
732    /// publish events.
733    pub fn insert_completed(&self, proof: OutputDedupeProof) -> Result<(), AgentError> {
734        self.completed
735            .lock()
736            .map_err(|_| AgentError::contract_violation("output dedupe index lock poisoned"))?
737            .insert(proof.dedupe_key.clone(), proof);
738        Ok(())
739    }
740
741    /// Returns an updated value with completed configured.
742    /// This is data-only and does not perform I/O, call host ports, append journals, publish
743    /// events, or start processes.
744    pub fn completed(
745        &self,
746        dedupe_key: &crate::domain::DedupeKey,
747    ) -> Result<Option<OutputDedupeProof>, AgentError> {
748        Ok(self
749            .completed
750            .lock()
751            .map_err(|_| AgentError::contract_violation("output dedupe index lock poisoned"))?
752            .get(dedupe_key)
753            .cloned())
754    }
755}
756
757#[derive(Clone, Debug, Eq, PartialEq)]
758/// Holds output dedupe proof application-layer state or configuration.
759/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
760pub struct OutputDedupeProof {
761    /// Dedupe policy or key for a side-effecting operation.
762    /// Replay and repair use it to avoid sending or executing the same effect twice.
763    pub dedupe_key: crate::domain::DedupeKey,
764    /// Stable delivery id used for typed lineage, lookup, or dedupe.
765    pub delivery_id: OutputDeliveryId,
766    /// Stable external operation id used for typed lineage, lookup, or
767    /// dedupe.
768    pub external_operation_id: Option<String>,
769    /// Finite status for this record or lifecycle stage.
770    pub status: OutputDispatchStatus,
771}
772
773fn resolve_content_mode(
774    candidate: &OutputDeliveryCandidate,
775    capabilities: &OutputSinkCapabilities,
776    sink_ref: &OutputSinkRef,
777) -> Result<OutputContentMode, String> {
778    let requested = candidate
779        .requested_content_mode
780        .unwrap_or(candidate.policy.default_content_mode);
781    let fallback_modes = [
782        requested,
783        OutputContentMode::RedactedSummary,
784        OutputContentMode::ContentRefsOnly,
785    ];
786
787    for mode in fallback_modes {
788        if !candidate.policy.allows_mode(mode) || !capabilities.supports_content_mode(mode) {
789            continue;
790        }
791        if mode == OutputContentMode::RawContentIfPolicyAllows {
792            let Some(raw_content) = candidate.raw_content.as_ref() else {
793                continue;
794            };
795            if !candidate
796                .policy
797                .raw_content_policy
798                .allows_raw_for(sink_ref, raw_content.len())
799            {
800                continue;
801            }
802        }
803        return Ok(mode);
804    }
805
806    Err(
807        "output sink lacks required content-mode capability or policy denied raw content"
808            .to_string(),
809    )
810}
811
812fn journal_failure(error: AgentError) -> AgentError {
813    AgentError::new(
814        AgentErrorKind::JournalFailure,
815        RetryClassification::RepairNeeded,
816        error.context().message,
817    )
818}
819
820fn stable_fragment(value: &str) -> String {
821    use sha2::{Digest, Sha256};
822
823    let digest = Sha256::digest(value.as_bytes());
824    digest[..6]
825        .iter()
826        .map(|byte| format!("{byte:02x}"))
827        .collect::<String>()
828}