Skip to main content

agent_sdk_core/records/
stream.rs

1//! Durable and observable SDK records. Use these DTOs for events, journals, effects,
2//! context, output, and feature evidence. Constructing records is data-only;
3//! persistence, publication, and external actions happen through ports or application
4//! coordinators. This file contains the stream portion of that contract.
5//!
6use core::fmt;
7
8use serde::{Deserialize, Deserializer, Serialize, de::Error as DeError};
9use sha2::{Digest, Sha256};
10
11use crate::{
12    domain::{
13        AgentError, AgentId, ContentRef, DestinationRef, EffectId, EntityKind, EntityRef,
14        IdValidationError, MessageId, PolicyKind, PolicyRef, PrivacyClass, RetentionClass, RunId,
15        SourceKind, SourceRef, ToolCallId, TurnId,
16    },
17    effect::{EffectIntent, EffectKind, EffectResult},
18    ids::{AttemptId, validate_identifier},
19    journal::{JournalRecord, JournalRecordBase, JournalRecordKind, JournalRecordPayload},
20};
21
22macro_rules! typed_stream_id {
23    ($name:ident) => {
24        #[doc = concat!(
25                            "Typed stream identifier for `",
26                            stringify!($name),
27                            "`. Use it to correlate deltas, rules, matchers, and interventions; ",
28                            "constructing it is data-only and performs no side effects."
29                        )]
30        #[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
31        #[serde(transparent)]
32        pub struct $name(String);
33
34        impl $name {
35            /// Creates a new records::stream value with explicit
36            /// caller-provided inputs. This constructor is data-only
37            /// and performs no I/O or external side effects.
38            ///
39            /// # Panics
40            ///
41            /// Panics if constructor invariants fail, such as invalid identifier
42            /// text or constructor-specific bounds. Use a fallible constructor such as
43            /// `try_new` when one is available for untrusted input.
44            pub fn new(value: impl Into<String>) -> Self {
45                Self::try_new(value).expect(concat!(stringify!($name), " must be valid"))
46            }
47
48            /// Creates a new records::stream value after validation.
49            /// Returns an SDK error instead of panicking when the
50            /// identifier or input does not satisfy the contract.
51            pub fn try_new(value: impl Into<String>) -> Result<Self, IdValidationError> {
52                let value = value.into();
53                validate_identifier(&value)?;
54                Ok(Self(value))
55            }
56
57            /// Returns this value as str. The accessor is side-effect
58            /// free and keeps ownership with the caller.
59            pub fn as_str(&self) -> &str {
60                &self.0
61            }
62        }
63
64        impl From<&str> for $name {
65            fn from(value: &str) -> Self {
66                Self::new(value)
67            }
68        }
69
70        impl<'de> Deserialize<'de> for $name {
71            fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
72            where
73                D: Deserializer<'de>,
74            {
75                let value = String::deserialize(deserializer)?;
76                Self::try_new(value).map_err(D::Error::custom)
77            }
78        }
79
80        impl fmt::Debug for $name {
81            fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
82                formatter.write_str(concat!(stringify!($name), "(redacted)"))
83            }
84        }
85
86        impl fmt::Display for $name {
87            fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
88                formatter.write_str(concat!(stringify!($name), "(redacted)"))
89            }
90        }
91    };
92}
93
94typed_stream_id!(StreamDeltaId);
95typed_stream_id!(StreamRuleId);
96typed_stream_id!(StreamInterventionId);
97typed_stream_id!(StreamMatchId);
98typed_stream_id!(MarkerId);
99typed_stream_id!(MarkerVersion);
100typed_stream_id!(MatcherEngineRef);
101
102#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
103#[serde(transparent)]
104/// Carries the rule version record payload for journal, event, or fixture surfaces.
105/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
106pub struct RuleVersion(pub u32);
107
108impl RuleVersion {
109    /// Creates a new records::stream value with explicit
110    /// caller-provided inputs. This constructor is data-only and
111    /// performs no I/O or external side effects.
112    pub fn new(version: u32) -> Self {
113        assert!(version > 0, "RuleVersion must be nonzero");
114        Self(version)
115    }
116}
117
118#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
119#[serde(rename_all = "snake_case")]
120/// Enumerates the finite stream channel cases.
121/// Serialized names are part of the SDK contract; update fixtures when variants change.
122pub enum StreamChannel {
123    /// Use this variant when the contract needs to represent assistant text; selecting it has no side effect by itself.
124    AssistantText,
125    /// Use this variant when the contract needs to represent reasoning summary; selecting it has no side effect by itself.
126    ReasoningSummary,
127    /// Use this variant when the contract needs to represent provider exposed reasoning; selecting it has no side effect by itself.
128    ProviderExposedReasoning,
129    /// Use this variant when the contract needs to represent tool call arguments; selecting it has no side effect by itself.
130    ToolCallArguments,
131    /// Use this variant when the contract needs to represent tool result text; selecting it has no side effect by itself.
132    ToolResultText,
133    /// Use this variant when the contract needs to represent realtime transcript; selecting it has no side effect by itself.
134    RealtimeTranscript,
135    /// Use this variant when the contract needs to represent realtime media; selecting it has no side effect by itself.
136    RealtimeMedia,
137    /// Use this variant when the contract needs to represent hidden chain of thought; selecting it has no side effect by itself.
138    HiddenChainOfThought,
139}
140
141impl StreamChannel {
142    /// Reports whether this value is policy visible. The check is pure
143    /// and does not mutate SDK or host state.
144    pub fn is_policy_visible(&self) -> bool {
145        !matches!(self, Self::HiddenChainOfThought)
146    }
147
148    /// Returns this value as contract name. The accessor is side-effect
149    /// free and keeps ownership with the caller.
150    pub fn as_contract_name(&self) -> &'static str {
151        match self {
152            Self::AssistantText => "assistant_text",
153            Self::ReasoningSummary => "reasoning_summary",
154            Self::ProviderExposedReasoning => "provider_exposed_reasoning",
155            Self::ToolCallArguments => "tool_call_arguments",
156            Self::ToolResultText => "tool_result_text",
157            Self::RealtimeTranscript => "realtime_transcript",
158            Self::RealtimeMedia => "realtime_media",
159            Self::HiddenChainOfThought => "hidden_chain_of_thought",
160        }
161    }
162}
163
164#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
165#[serde(rename_all = "snake_case")]
166/// Enumerates the finite stream direction cases.
167/// Serialized names are part of the SDK contract; update fixtures when variants change.
168pub enum StreamDirection {
169    /// Use this variant when the contract needs to represent input to provider; selecting it has no side effect by itself.
170    InputToProvider,
171    /// Use this variant when the contract needs to represent output from provider; selecting it has no side effect by itself.
172    OutputFromProvider,
173}
174
175#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
176#[serde(rename_all = "snake_case")]
177/// Enumerates the finite stream cursor precision cases.
178/// Serialized names are part of the SDK contract; update fixtures when variants change.
179pub enum StreamCursorPrecision {
180    /// Use this variant when the contract needs to represent byte offset; selecting it has no side effect by itself.
181    ByteOffset,
182    /// Use this variant when the contract needs to represent chunk sequence only; selecting it has no side effect by itself.
183    ChunkSequenceOnly,
184    /// Use this variant when the contract needs to represent marker; selecting it has no side effect by itself.
185    Marker,
186}
187
188#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
189/// Carries the stream cursor record payload for journal, event, or fixture surfaces.
190/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
191pub struct StreamCursor {
192    /// Chunk sequence used by this record or request.
193    pub chunk_sequence: u64,
194    /// Byte size or byte limit for byte offset.
195    /// Use it to enforce bounded reads, writes, summaries, or parser output.
196    pub byte_offset: u64,
197    /// Precision used by this record or request.
198    pub precision: StreamCursorPrecision,
199    #[serde(skip_serializing_if = "Option::is_none")]
200    /// Optional label value.
201    /// When absent, callers should use the documented default or skip that optional behavior.
202    pub label: Option<String>,
203}
204
205impl StreamCursor {
206    /// Builds the chunk value with the documented defaults.
207    /// This is data-only and does not perform I/O, call host ports, append journals, publish
208    /// events, or start processes.
209    pub fn chunk(chunk_sequence: u64) -> Self {
210        Self {
211            chunk_sequence,
212            byte_offset: 0,
213            precision: StreamCursorPrecision::ChunkSequenceOnly,
214            label: None,
215        }
216    }
217
218    /// Builds the byte value with the documented defaults.
219    /// This is data-only and does not perform I/O, call host ports, append journals, publish
220    /// events, or start processes.
221    pub fn byte(chunk_sequence: u64, byte_offset: u64) -> Self {
222        Self {
223            chunk_sequence,
224            byte_offset,
225            precision: StreamCursorPrecision::ByteOffset,
226            label: None,
227        }
228    }
229
230    /// Marker.
231    /// This is data-only and does not perform I/O, call host ports, append journals, publish
232    /// events, or start processes.
233    pub fn marker(chunk_sequence: u64, label: impl Into<String>) -> Self {
234        Self {
235            chunk_sequence,
236            byte_offset: 0,
237            precision: StreamCursorPrecision::Marker,
238            label: Some(label.into()),
239        }
240    }
241}
242
243#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
244/// Carries the stream delta record payload for journal, event, or fixture surfaces.
245/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
246pub struct StreamDelta {
247    /// Stable delta id used for typed lineage, lookup, or dedupe.
248    pub delta_id: StreamDeltaId,
249    /// Run identifier used for lineage, filtering, replay, and dedupe.
250    pub run_id: RunId,
251    /// Agent identifier used for lineage, filtering, and ownership checks.
252    pub agent_id: AgentId,
253    #[serde(skip_serializing_if = "Option::is_none")]
254    /// Turn identifier for one loop turn within a run.
255    pub turn_id: Option<TurnId>,
256    #[serde(skip_serializing_if = "Option::is_none")]
257    /// Attempt identifier for retry, repair, provider, or tool execution
258    /// evidence.
259    pub attempt_id: Option<AttemptId>,
260    #[serde(skip_serializing_if = "Option::is_none")]
261    /// Message identifier for transcript, projection, or provider-response
262    /// lineage.
263    pub message_id: Option<MessageId>,
264    #[serde(skip_serializing_if = "Option::is_none")]
265    /// Stable tool call id used for typed lineage, lookup, or dedupe.
266    pub tool_call_id: Option<ToolCallId>,
267    #[serde(skip_serializing_if = "Option::is_none")]
268    /// Stable realtime session id used for typed lineage, lookup, or dedupe.
269    pub realtime_session_id: Option<crate::realtime_records::RealtimeSessionId>,
270    /// Channel used by this record or request.
271    pub channel: StreamChannel,
272    #[serde(skip_serializing_if = "Option::is_none")]
273    /// Optional direction value.
274    /// When absent, callers should use the documented default or skip that optional behavior.
275    pub direction: Option<StreamDirection>,
276    /// Cursor identifying a replay, export, or subscription position.
277    /// Use it to resume without widening the original scope.
278    pub cursor: StreamCursor,
279    /// Source label or ref for this item; it is metadata and does not fetch
280    /// content by itself.
281    pub source: SourceRef,
282    #[serde(skip_serializing_if = "Option::is_none")]
283    /// Destination label or ref for this item; it is metadata and does not
284    /// deliver content by itself.
285    pub destination: Option<DestinationRef>,
286    #[serde(default, skip_serializing_if = "Vec::is_empty")]
287    /// Policy references that govern admission, projection, execution, or
288    /// delivery.
289    pub policy_refs: Vec<PolicyRef>,
290    /// Privacy class used for projection, telemetry, and raw-content access
291    /// decisions.
292    pub privacy: PrivacyClass,
293    /// Retention class used by hosts and sinks when storing or exporting this
294    /// item.
295    pub retention: RetentionClass,
296    #[serde(skip_serializing_if = "Option::is_none")]
297    /// Content reference where payload bytes or structured tool output are
298    /// stored.
299    pub content_ref: Option<ContentRef>,
300    #[serde(skip_serializing_if = "Option::is_none")]
301    /// Stable marker id used for typed lineage, lookup, or dedupe.
302    pub marker_id: Option<MarkerId>,
303    /// Redacted human-readable summary safe for events, telemetry, and logs.
304    pub redacted_summary: String,
305    /// Fingerprint of the runtime package snapshot in force when this value was produced.
306    /// Use it for replay, dedupe, and package-lineage checks; the field is evidence and does
307    /// not execute package behavior.
308    pub runtime_package_fingerprint: String,
309    #[serde(skip)]
310    match_text: Option<String>,
311}
312
313impl StreamDelta {
314    /// Builds the visible text value with the documented defaults.
315    /// This is data-only and does not perform I/O, call host ports, append journals, publish
316    /// events, or start processes.
317    pub fn visible_text(
318        delta_id: impl Into<String>,
319        channel: StreamChannel,
320        cursor: StreamCursor,
321        text: impl Into<String>,
322        source: SourceRef,
323    ) -> Self {
324        let text = text.into();
325        Self {
326            delta_id: StreamDeltaId::new(delta_id),
327            run_id: RunId::new("run.stream.delta.unset"),
328            agent_id: AgentId::new("agent.stream.delta.unset"),
329            turn_id: None,
330            attempt_id: None,
331            message_id: None,
332            tool_call_id: None,
333            realtime_session_id: None,
334            channel,
335            direction: None,
336            cursor,
337            source,
338            destination: None,
339            policy_refs: Vec::new(),
340            privacy: PrivacyClass::Public,
341            retention: RetentionClass::RunScoped,
342            content_ref: None,
343            marker_id: None,
344            redacted_summary: format!("{} bytes visible stream text", text.len()),
345            runtime_package_fingerprint: "runtime.package.fingerprint.test".to_string(),
346            match_text: Some(text),
347        }
348    }
349
350    /// Marker.
351    /// This is data-only and does not perform I/O, call host ports, append journals, publish
352    /// events, or start processes.
353    pub fn marker(
354        delta_id: impl Into<String>,
355        channel: StreamChannel,
356        cursor: StreamCursor,
357        marker_id: impl Into<String>,
358        source: SourceRef,
359    ) -> Self {
360        let marker_id = MarkerId::new(marker_id);
361        Self {
362            delta_id: StreamDeltaId::new(delta_id),
363            run_id: RunId::new("run.stream.delta.unset"),
364            agent_id: AgentId::new("agent.stream.delta.unset"),
365            turn_id: None,
366            attempt_id: None,
367            message_id: None,
368            tool_call_id: None,
369            realtime_session_id: None,
370            channel,
371            direction: None,
372            cursor,
373            source,
374            destination: None,
375            policy_refs: Vec::new(),
376            privacy: PrivacyClass::Internal,
377            retention: RetentionClass::RunScoped,
378            content_ref: None,
379            marker_id: Some(marker_id),
380            redacted_summary: "typed stream marker".to_string(),
381            runtime_package_fingerprint: "runtime.package.fingerprint.test".to_string(),
382            match_text: None,
383        }
384    }
385
386    /// Returns this value with its run setting replaced. The method
387    /// follows builder-style data construction and does not execute
388    /// external work.
389    pub fn with_run(mut self, run_id: RunId, agent_id: AgentId) -> Self {
390        self.run_id = run_id;
391        self.agent_id = agent_id;
392        self
393    }
394
395    /// Returns this value with its turn setting replaced. The method
396    /// follows builder-style data construction and does not execute
397    /// external work.
398    pub fn with_turn(mut self, turn_id: TurnId) -> Self {
399        self.turn_id = Some(turn_id);
400        self
401    }
402
403    /// Returns this value with its attempt setting replaced. The method
404    /// follows builder-style data construction and does not execute
405    /// external work.
406    pub fn with_attempt(mut self, attempt_id: AttemptId) -> Self {
407        self.attempt_id = Some(attempt_id);
408        self
409    }
410
411    /// Returns this value with its direction setting replaced. The
412    /// method follows builder-style data construction and does not
413    /// execute external work.
414    pub fn with_direction(mut self, direction: StreamDirection) -> Self {
415        self.direction = Some(direction);
416        self
417    }
418
419    /// Returns this value with its destination setting replaced. The
420    /// method follows builder-style data construction and does not
421    /// execute external work.
422    pub fn with_destination(mut self, destination: DestinationRef) -> Self {
423        self.destination = Some(destination);
424        self
425    }
426
427    /// Returns this value with its content ref setting replaced. The
428    /// method follows builder-style data construction and does not
429    /// execute external work.
430    pub fn with_content_ref(mut self, content_ref: ContentRef) -> Self {
431        self.content_ref = Some(content_ref);
432        self
433    }
434
435    /// Returns the matcher text currently held by this value.
436    /// This is data-only and does not perform I/O, call host ports, append journals, publish
437    /// events, or start processes.
438    pub fn matcher_text(&self) -> Option<&str> {
439        if !self.channel.is_policy_visible() || self.privacy == PrivacyClass::Secret {
440            return None;
441        }
442        self.match_text.as_deref()
443    }
444
445    /// Returns whether serialized raw text absent applies for this contract.
446    /// This is data-only and does not perform I/O, call host ports, append journals, publish
447    /// events, or start processes.
448    pub fn serialized_raw_text_absent(&self) -> bool {
449        serde_json::to_string(self)
450            .map(|json| {
451                self.match_text
452                    .as_ref()
453                    .is_none_or(|text| !json.contains(text))
454            })
455            .unwrap_or(false)
456    }
457}
458
459#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
460/// Carries the stream channel selector record payload for journal, event, or fixture surfaces.
461/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
462pub struct StreamChannelSelector {
463    /// Channel used by this record or request.
464    pub channel: StreamChannel,
465    #[serde(skip_serializing_if = "Option::is_none")]
466    /// Optional direction value.
467    /// When absent, callers should use the documented default or skip that optional behavior.
468    pub direction: Option<StreamDirection>,
469}
470
471impl StreamChannelSelector {
472    /// Builds the channel value with the documented defaults.
473    /// This is data-only and does not perform I/O, call host ports, append journals, publish
474    /// events, or start processes.
475    pub fn channel(channel: StreamChannel) -> Self {
476        Self {
477            channel,
478            direction: None,
479        }
480    }
481
482    /// Returns matches for the current value.
483    /// This is a read-only or data-construction helper unless the method body explicitly calls
484    /// a port or store.
485    pub fn matches(&self, delta: &StreamDelta) -> bool {
486        self.channel == delta.channel
487            && self
488                .direction
489                .as_ref()
490                .is_none_or(|direction| Some(direction) == delta.direction.as_ref())
491    }
492}
493
494#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
495#[serde(rename_all = "snake_case")]
496/// Enumerates the finite regex dialect cases.
497/// Serialized names are part of the SDK contract; update fixtures when variants change.
498pub enum RegexDialect {
499    /// Use this variant when the contract needs to represent safe subset; selecting it has no side effect by itself.
500    SafeSubset,
501}
502
503#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
504#[serde(tag = "kind", rename_all = "snake_case")]
505/// Enumerates the finite stream matcher cases.
506/// Serialized names are part of the SDK contract; update fixtures when variants change.
507pub enum StreamMatcher {
508    /// Use this variant when the contract needs to represent literal; selecting it has no side effect by itself.
509    Literal {
510        /// Text used by this record or request.
511        text: String,
512        /// Whether case sensitive is enabled.
513        /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
514        case_sensitive: bool,
515        /// window bytes used for bounds checks, summaries, or truncation
516        /// evidence.
517        window_bytes: u64,
518    },
519    /// Use this variant when the contract needs to represent regex; selecting it has no side effect by itself.
520    Regex {
521        /// Search pattern supplied by the caller.
522        /// The grep executor compiles it under regex and output bounds before reading files.
523        pattern: String,
524        /// Schema dialect used to interpret the output schema.
525        /// Validators use it to select the supported JSON-schema subset and compatibility
526        /// rules.
527        dialect: RegexDialect,
528        /// window bytes used for bounds checks, summaries, or truncation
529        /// evidence.
530        window_bytes: u64,
531        /// Timeout budget in milliseconds for the requested operation.
532        timeout_ms: u64,
533    },
534    /// Use this variant when the contract needs to represent marker; selecting it has no side effect by itself.
535    Marker {
536        /// Stable marker id used for typed lineage, lookup, or dedupe.
537        marker_id: MarkerId,
538        /// Version string for this capability, package, or protocol surface.
539        /// Use it for compatibility checks during package or adapter resolution.
540        marker_version: MarkerVersion,
541    },
542    /// Use this variant when the contract needs to represent host matcher; selecting it has no side effect by itself.
543    HostMatcher {
544        /// Typed matcher ref reference. Resolving or executing it is a
545        /// separate policy-gated step.
546        matcher_ref: MatcherEngineRef,
547        #[serde(skip_serializing_if = "Option::is_none")]
548        /// Typed risk policy ref reference. Resolving or executing it is a
549        /// separate policy-gated step.
550        risk_policy_ref: Option<PolicyRef>,
551    },
552}
553
554impl StreamMatcher {
555    /// Builds the literal value with the documented defaults.
556    /// This is data-only and does not perform I/O, call host ports, append journals, publish
557    /// events, or start processes.
558    pub fn literal(text: impl Into<String>, case_sensitive: bool, window_bytes: u64) -> Self {
559        Self::Literal {
560            text: text.into(),
561            case_sensitive,
562            window_bytes,
563        }
564    }
565
566    /// Builds the regex with limits value with the documented defaults.
567    /// This is data-only and does not perform I/O, call host ports, append journals, publish
568    /// events, or start processes.
569    pub fn regex_with_limits(
570        pattern: impl Into<String>,
571        window_bytes: u64,
572        timeout_ms: u64,
573    ) -> Self {
574        Self::Regex {
575            pattern: pattern.into(),
576            dialect: RegexDialect::SafeSubset,
577            window_bytes,
578            timeout_ms,
579        }
580    }
581
582    /// Marker.
583    /// This is data-only and does not perform I/O, call host ports, append journals, publish
584    /// events, or start processes.
585    pub fn marker(marker_id: impl Into<String>) -> Self {
586        Self::Marker {
587            marker_id: MarkerId::new(marker_id),
588            marker_version: MarkerVersion::new("marker.v1"),
589        }
590    }
591
592    /// Builds the window bytes value with the documented defaults.
593    /// This is data-only and does not perform I/O, call host ports, append journals, publish
594    /// events, or start processes.
595    pub fn window_bytes(&self) -> u64 {
596        match self {
597            Self::Literal { window_bytes, .. } | Self::Regex { window_bytes, .. } => *window_bytes,
598            Self::Marker { .. } | Self::HostMatcher { .. } => 0,
599        }
600    }
601
602    /// Returns the kind name currently held by this value.
603    /// This is data-only and does not perform I/O, call host ports, append journals, publish
604    /// events, or start processes.
605    pub fn kind_name(&self) -> &'static str {
606        match self {
607            Self::Literal { .. } => "literal",
608            Self::Regex { .. } => "regex",
609            Self::Marker { .. } => "marker",
610            Self::HostMatcher { .. } => "host_matcher",
611        }
612    }
613
614    /// Validates the records::stream invariants and returns a typed
615    /// error on failure. Validation is pure and does not perform I/O,
616    /// dispatch, journal appends, or adapter calls.
617    pub fn validate(&self) -> Result<(), AgentError> {
618        match self {
619            Self::Literal {
620                text, window_bytes, ..
621            } => {
622                if text.is_empty() {
623                    return Err(AgentError::missing_required_field(
624                        "stream_matcher.literal.text",
625                    ));
626                }
627                validate_window(*window_bytes)
628            }
629            Self::Regex {
630                pattern,
631                window_bytes,
632                timeout_ms,
633                ..
634            } => {
635                validate_window(*window_bytes)?;
636                if *timeout_ms == 0 || *timeout_ms > 1000 {
637                    return Err(AgentError::contract_violation(
638                        "stream regex timeout must be between 1 and 1000ms",
639                    ));
640                }
641                validate_safe_regex(pattern)
642            }
643            Self::Marker { marker_id, .. } => {
644                if marker_id.as_str().is_empty() {
645                    return Err(AgentError::missing_required_field(
646                        "stream_matcher.marker.marker_id",
647                    ));
648                }
649                Ok(())
650            }
651            Self::HostMatcher {
652                risk_policy_ref, ..
653            } => {
654                if risk_policy_ref.is_none() {
655                    return Err(AgentError::contract_violation(
656                        "host stream matcher requires declared risk policy ref",
657                    ));
658                }
659                Ok(())
660            }
661        }
662    }
663}
664
665#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
666#[serde(rename_all = "snake_case")]
667/// Enumerates the finite stream rule scope cases.
668/// Serialized names are part of the SDK contract; update fixtures when variants change.
669pub enum StreamRuleScope {
670    /// Use this variant when the contract needs to represent run; selecting it has no side effect by itself.
671    Run,
672    /// Use this variant when the contract needs to represent turn; selecting it has no side effect by itself.
673    Turn,
674    /// Use this variant when the contract needs to represent attempt; selecting it has no side effect by itself.
675    Attempt,
676    /// Use this variant when the contract needs to represent realtime session; selecting it has no side effect by itself.
677    RealtimeSession,
678}
679
680#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
681#[serde(rename_all = "snake_case")]
682/// Enumerates the finite partial output policy cases.
683/// Serialized names are part of the SDK contract; update fixtures when variants change.
684pub enum PartialOutputPolicy {
685    /// Use this variant when the contract needs to represent keep; selecting it has no side effect by itself.
686    Keep,
687    /// Use this variant when the contract needs to represent discard; selecting it has no side effect by itself.
688    Discard,
689    /// Use this variant when the contract needs to represent mask; selecting it has no side effect by itself.
690    Mask,
691    /// Use this variant when the contract needs to represent content ref only; selecting it has no side effect by itself.
692    ContentRefOnly,
693}
694
695#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
696#[serde(rename_all = "snake_case")]
697/// Enumerates the finite repeat policy cases.
698/// Serialized names are part of the SDK contract; update fixtures when variants change.
699pub enum RepeatPolicy {
700    /// Use this variant when the contract needs to represent always; selecting it has no side effect by itself.
701    Always,
702    /// Use this variant when the contract needs to represent once per run; selecting it has no side effect by itself.
703    OncePerRun,
704    /// Use this variant when the contract needs to represent once per turn; selecting it has no side effect by itself.
705    OncePerTurn,
706    /// Use this variant when the contract needs to represent once per attempt and span; selecting it has no side effect by itself.
707    OncePerAttemptAndSpan,
708}
709
710#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
711#[serde(rename_all = "snake_case")]
712/// Enumerates the finite match privacy policy cases.
713/// Serialized names are part of the SDK contract; update fixtures when variants change.
714pub enum MatchPrivacyPolicy {
715    /// Use this variant when the contract needs to represent hash length and summary; selecting it has no side effect by itself.
716    HashLengthAndSummary,
717    /// Use this variant when the contract needs to represent raw capture if policy allows; selecting it has no side effect by itself.
718    RawCaptureIfPolicyAllows,
719}
720
721#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
722#[serde(tag = "action", rename_all = "snake_case")]
723/// Enumerates the finite stream action cases.
724/// Serialized names are part of the SDK contract; update fixtures when variants change.
725pub enum StreamAction {
726    /// Use this variant when the contract needs to represent stop run; selecting it has no side effect by itself.
727    StopRun {
728        /// Redacted explanation for a denial, failure, status, or package
729        /// delta.
730        reason: String,
731        /// Partial output used by this record or request.
732        partial_output: PartialOutputPolicy,
733    },
734    /// Use this variant when the contract needs to represent abort and retry; selecting it has no side effect by itself.
735    AbortAndRetry {
736        /// Redacted summary for display, logs, events, or telemetry.
737        /// It should describe the value without exposing raw private content.
738        injection_summary: String,
739        /// Typed retry policy ref reference. Resolving or executing it is a
740        /// separate policy-gated step.
741        retry_policy_ref: PolicyRef,
742        /// Partial output used by this record or request.
743        partial_output: PartialOutputPolicy,
744    },
745    /// Use this variant when the contract needs to represent pause for approval; selecting it has no side effect by itself.
746    PauseForApproval {
747        /// Typed approval policy ref reference. Resolving or executing it is
748        /// a separate policy-gated step.
749        approval_policy_ref: PolicyRef,
750        /// Typed resume policy ref reference. Resolving or executing it is a
751        /// separate policy-gated step.
752        resume_policy_ref: PolicyRef,
753    },
754    /// Use this variant when the contract needs to represent mask and continue; selecting it has no side effect by itself.
755    MaskAndContinue {
756        /// Replacement used by this record or request.
757        replacement: String,
758    },
759    /// Use this variant when the contract needs to represent emit only; selecting it has no side effect by itself.
760    EmitOnly {
761        /// Kind discriminator for notice kind.
762        /// Use it to route finite match arms without parsing display text.
763        notice_kind: String,
764    },
765}
766
767impl StreamAction {
768    /// Returns an updated value with mask and continue configured.
769    /// This is data-only and does not perform I/O, call host ports, append journals, publish
770    /// events, or start processes.
771    pub fn mask_and_continue(replacement: impl Into<String>) -> Self {
772        Self::MaskAndContinue {
773            replacement: replacement.into(),
774        }
775    }
776
777    /// Returns an updated value with abort and retry configured.
778    /// This is data-only and does not perform I/O, call host ports, append journals, publish
779    /// events, or start processes.
780    pub fn abort_and_retry(
781        injection_summary: impl Into<String>,
782        retry_policy_ref: PolicyRef,
783    ) -> Self {
784        Self::AbortAndRetry {
785            injection_summary: injection_summary.into(),
786            retry_policy_ref,
787            partial_output: PartialOutputPolicy::Discard,
788        }
789    }
790
791    /// Builds the emit only value with the documented defaults.
792    /// This is data-only and does not perform I/O, call host ports, append journals, publish
793    /// events, or start processes.
794    pub fn emit_only(notice_kind: impl Into<String>) -> Self {
795        Self::EmitOnly {
796            notice_kind: notice_kind.into(),
797        }
798    }
799
800    /// Returns the action kind currently held by this value.
801    /// This is data-only and does not perform I/O, call host ports, append journals, publish
802    /// events, or start processes.
803    pub fn action_kind(&self) -> &'static str {
804        match self {
805            Self::StopRun { .. } => "stop_run",
806            Self::AbortAndRetry { .. } => "abort_and_retry",
807            Self::PauseForApproval { .. } => "pause_for_approval",
808            Self::MaskAndContinue { .. } => "mask_and_continue",
809            Self::EmitOnly { .. } => "emit_only",
810        }
811    }
812
813    /// Builds the partial output policy value.
814    /// This is data construction and performs no I/O, journal append, event publication, or
815    /// process work.
816    pub fn partial_output_policy(&self) -> PartialOutputPolicy {
817        match self {
818            Self::StopRun { partial_output, .. } | Self::AbortAndRetry { partial_output, .. } => {
819                partial_output.clone()
820            }
821            Self::PauseForApproval { .. } | Self::EmitOnly { .. } => PartialOutputPolicy::Keep,
822            Self::MaskAndContinue { .. } => PartialOutputPolicy::Mask,
823        }
824    }
825
826    /// Returns the effect kind hint currently held by this value.
827    /// This is data-only and does not perform I/O, call host ports, append journals, publish
828    /// events, or start processes.
829    pub fn effect_kind_hint(&self) -> Option<EffectKind> {
830        match self {
831            Self::AbortAndRetry { .. } => Some(EffectKind::ProviderRequest),
832            Self::PauseForApproval { .. } => Some(EffectKind::ApprovalDispatch),
833            Self::StopRun { .. } | Self::MaskAndContinue { .. } | Self::EmitOnly { .. } => None,
834        }
835    }
836}
837
838#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
839/// Carries the stream rule record payload for journal, event, or fixture surfaces.
840/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
841pub struct StreamRule {
842    /// Stable identifier for this record.
843    pub id: StreamRuleId,
844    /// Version string for this capability, package, or protocol surface.
845    /// Use it for compatibility checks during package or adapter resolution.
846    pub version: RuleVersion,
847    /// Source label or ref for this item; it is metadata and does not fetch
848    /// content by itself.
849    pub source: SourceRef,
850    /// Matcher used by this record or request.
851    pub matcher: StreamMatcher,
852    /// Collection of channels values.
853    /// Ordering and membership should be treated as part of the serialized contract when
854    /// relevant.
855    pub channels: Vec<StreamChannelSelector>,
856    /// Scope used by this record or request.
857    pub scope: StreamRuleScope,
858    /// Action used by this record or request.
859    pub action: StreamAction,
860    /// Repeat used by this record or request.
861    pub repeat: RepeatPolicy,
862    /// Privacy class used for projection, telemetry, and raw-content access
863    /// decisions.
864    pub privacy: MatchPrivacyPolicy,
865    #[serde(default, skip_serializing_if = "Vec::is_empty")]
866    /// Policy references that govern admission, projection, execution, or
867    /// delivery.
868    pub policy_refs: Vec<PolicyRef>,
869}
870
871impl StreamRule {
872    /// Starts a builder for this records::stream value. Building is
873    /// data-only; runtime side effects occur only when a later
874    /// coordinator or host port executes the built configuration.
875    pub fn builder(id: StreamRuleId) -> StreamRuleBuilder {
876        StreamRuleBuilder::new(id)
877    }
878
879    /// Returns an updated value with mask regex configured.
880    /// This is data-only and does not perform I/O, call host ports, append journals, publish
881    /// events, or start processes.
882    pub fn mask_regex(id: impl Into<String>, pattern: impl Into<String>) -> StreamRuleBuilder {
883        StreamRuleBuilder::new(StreamRuleId::new(id))
884            .source(SourceRef::with_kind(
885                SourceKind::Host,
886                "source.host.stream_rules",
887            ))
888            .matcher(StreamMatcher::regex_with_limits(pattern, 4096, 25))
889            .action(StreamAction::mask_and_continue("[redacted]"))
890    }
891
892    /// Validates the records::stream invariants and returns a typed
893    /// error on failure. Validation is pure and does not perform I/O,
894    /// dispatch, journal appends, or adapter calls.
895    pub fn validate(&self) -> Result<(), AgentError> {
896        self.matcher.validate()?;
897        if self.channels.is_empty() {
898            return Err(AgentError::missing_required_field("stream_rule.channels"));
899        }
900        if self.policy_refs.is_empty() {
901            return Err(AgentError::missing_required_field(
902                "stream_rule.policy_refs",
903            ));
904        }
905        Ok(())
906    }
907}
908
909#[derive(Clone, Debug)]
910/// Carries the stream rule builder record payload for journal, event, or fixture surfaces.
911/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
912pub struct StreamRuleBuilder {
913    rule: StreamRule,
914}
915
916impl StreamRuleBuilder {
917    /// Creates a new records::stream value with explicit
918    /// caller-provided inputs. This constructor is data-only and
919    /// performs no I/O or external side effects.
920    pub fn new(id: StreamRuleId) -> Self {
921        Self {
922            rule: StreamRule {
923                id,
924                version: RuleVersion::new(1),
925                source: SourceRef::with_kind(SourceKind::Host, "source.host.stream_rules"),
926                matcher: StreamMatcher::literal("unset", true, 64),
927                channels: Vec::new(),
928                scope: StreamRuleScope::Run,
929                action: StreamAction::emit_only("notice"),
930                repeat: RepeatPolicy::OncePerAttemptAndSpan,
931                privacy: MatchPrivacyPolicy::HashLengthAndSummary,
932                policy_refs: Vec::new(),
933            },
934        }
935    }
936
937    /// Returns an updated records::stream value with source applied. This is
938    /// data construction only and does not execute the configured behavior.
939    pub fn source(mut self, source: SourceRef) -> Self {
940        self.rule.source = source;
941        self
942    }
943
944    /// Returns an updated value with matcher configured.
945    /// This is data-only and does not perform I/O, call host ports, append journals, publish
946    /// events, or start processes.
947    pub fn matcher(mut self, matcher: StreamMatcher) -> Self {
948        self.rule.matcher = matcher;
949        self
950    }
951
952    /// Returns an updated value with on configured.
953    /// This is data-only and does not perform I/O, call host ports, append journals, publish
954    /// events, or start processes.
955    pub fn on(mut self, channel: StreamChannel) -> Self {
956        self.rule
957            .channels
958            .push(StreamChannelSelector::channel(channel));
959        self
960    }
961
962    /// Returns an updated records::stream value with action applied. This is
963    /// data construction only and does not execute the configured behavior.
964    pub fn action(mut self, action: StreamAction) -> Self {
965        self.rule.action = action;
966        self
967    }
968
969    /// Returns an updated value with repeat configured.
970    /// This is data-only and does not perform I/O, call host ports, append journals, publish
971    /// events, or start processes.
972    pub fn repeat(mut self, repeat: RepeatPolicy) -> Self {
973        self.rule.repeat = repeat;
974        self
975    }
976
977    /// Returns policy for the current value.
978    /// This is a read-only or data-construction helper unless the method body explicitly calls
979    /// a port or store.
980    pub fn policy(mut self, policy_ref: PolicyRef) -> Self {
981        self.rule.policy_refs.push(policy_ref);
982        self
983    }
984
985    /// Finishes builder validation and returns the configured value.
986    /// This is data-only unless the surrounding builder explicitly
987    /// documents adapter or store access.
988    pub fn build(self) -> Result<StreamRule, AgentError> {
989        self.rule.validate()?;
990        Ok(self.rule)
991    }
992}
993
994#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
995/// Carries the redacted match record payload for journal, event, or fixture surfaces.
996/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
997pub struct RedactedMatch {
998    /// Stable match id used for typed lineage, lookup, or dedupe.
999    pub match_id: StreamMatchId,
1000    /// Channel used by this record or request.
1001    pub channel: StreamChannel,
1002    #[serde(skip_serializing_if = "Option::is_none")]
1003    /// Optional direction value.
1004    /// When absent, callers should use the documented default or skip that optional behavior.
1005    pub direction: Option<StreamDirection>,
1006    /// Cursor identifying a replay, export, or subscription position.
1007    /// Use it to resume without widening the original scope.
1008    pub cursor: StreamCursor,
1009    /// Observed byte length for the source, sidecar, or extracted record.
1010    pub byte_len: usize,
1011    /// Deterministic text hash used for stale checks, package evidence, or
1012    /// replay comparisons.
1013    pub text_hash: String,
1014    /// Redacted human-readable summary safe for events, telemetry, and logs.
1015    pub redacted_summary: String,
1016    /// Privacy class used for projection, telemetry, and raw-content access
1017    /// decisions.
1018    pub privacy: PrivacyClass,
1019    /// Retention class used by hosts and sinks when storing or exporting this
1020    /// item.
1021    pub retention: RetentionClass,
1022}
1023
1024impl RedactedMatch {
1025    /// Constructs this value from text. Use it when adapting canonical
1026    /// SDK records without introducing a second behavior path.
1027    pub fn from_text(rule: &StreamRule, delta: &StreamDelta, matched_text: &str) -> Self {
1028        let hash = format!("sha256:{:x}", Sha256::digest(matched_text.as_bytes()));
1029        Self {
1030            match_id: StreamMatchId::new(format!(
1031                "stream.match.{}.{}",
1032                safe_id_fragment(rule.id.as_str()),
1033                &hash["sha256:".len()..("sha256:".len() + 12)]
1034            )),
1035            channel: delta.channel.clone(),
1036            direction: delta.direction.clone(),
1037            cursor: delta.cursor.clone(),
1038            byte_len: matched_text.len(),
1039            text_hash: hash,
1040            redacted_summary: format!(
1041                "stream match redacted: {} bytes on {}",
1042                matched_text.len(),
1043                delta.channel.as_contract_name()
1044            ),
1045            privacy: PrivacyClass::ContentRefsOnly,
1046            retention: RetentionClass::RunScoped,
1047        }
1048    }
1049}
1050
1051#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1052/// Carries the stream match ref record payload for journal, event, or fixture surfaces.
1053/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1054pub struct StreamMatchRef {
1055    /// Stable match id used for typed lineage, lookup, or dedupe.
1056    pub match_id: StreamMatchId,
1057    /// Stable rule id used for typed lineage, lookup, or dedupe.
1058    pub rule_id: StreamRuleId,
1059    /// Version string for this capability, package, or protocol surface.
1060    /// Use it for compatibility checks during package or adapter resolution.
1061    pub rule_version: RuleVersion,
1062}
1063
1064#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1065/// Carries the stream intervention record payload for journal, event, or fixture surfaces.
1066/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1067pub struct StreamIntervention {
1068    /// Stable intervention id used for typed lineage, lookup, or dedupe.
1069    pub intervention_id: StreamInterventionId,
1070    /// Typed rule ref reference. Resolving or executing it is a separate
1071    /// policy-gated step.
1072    pub rule_ref: EntityRef,
1073    /// Requested action used by this record or request.
1074    pub requested_action: StreamAction,
1075    #[serde(skip_serializing_if = "Option::is_none")]
1076    /// Optional applied action value.
1077    /// When absent, callers should use the documented default or skip that optional behavior.
1078    pub applied_action: Option<StreamAction>,
1079    /// Typed match ref reference. Resolving or executing it is a separate
1080    /// policy-gated step.
1081    pub match_ref: StreamMatchRef,
1082    /// Redacted match used by this record or request.
1083    pub redacted_match: RedactedMatch,
1084    /// Partial output policy used by this record or request.
1085    pub partial_output_policy: PartialOutputPolicy,
1086    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1087    /// Policy references that govern admission, projection, execution, or
1088    /// delivery.
1089    pub policy_refs: Vec<PolicyRef>,
1090    #[serde(skip_serializing_if = "Option::is_none")]
1091    /// Typed effect intent ref reference. Resolving or executing it is a
1092    /// separate policy-gated step.
1093    pub effect_intent_ref: Option<EffectId>,
1094    #[serde(skip_serializing_if = "Option::is_none")]
1095    /// Typed effect result ref reference. Resolving or executing it is a
1096    /// separate policy-gated step.
1097    pub effect_result_ref: Option<EffectId>,
1098    #[serde(skip_serializing_if = "Option::is_none")]
1099    /// Optional effect intent value.
1100    /// When absent, callers should use the documented default or skip that optional behavior.
1101    pub effect_intent: Option<EffectIntent>,
1102    #[serde(skip_serializing_if = "Option::is_none")]
1103    /// Optional effect result value.
1104    /// When absent, callers should use the documented default or skip that optional behavior.
1105    pub effect_result: Option<EffectResult>,
1106}
1107
1108impl StreamIntervention {
1109    /// Builds the proposed value.
1110    /// This is data construction and performs no I/O, journal append, event publication, or
1111    /// process work.
1112    pub fn proposed(rule: &StreamRule, redacted_match: RedactedMatch) -> Self {
1113        let match_ref = StreamMatchRef {
1114            match_id: redacted_match.match_id.clone(),
1115            rule_id: rule.id.clone(),
1116            rule_version: rule.version,
1117        };
1118        Self {
1119            intervention_id: StreamInterventionId::new(format!(
1120                "stream.intervention.{}.{}",
1121                safe_id_fragment(rule.id.as_str()),
1122                safe_id_fragment(redacted_match.match_id.as_str())
1123            )),
1124            rule_ref: EntityRef::new(EntityKind::StreamRule, rule.id.as_str()),
1125            requested_action: rule.action.clone(),
1126            applied_action: None,
1127            match_ref,
1128            redacted_match,
1129            partial_output_policy: rule.action.partial_output_policy(),
1130            policy_refs: rule.policy_refs.clone(),
1131            effect_intent_ref: None,
1132            effect_result_ref: None,
1133            effect_intent: None,
1134            effect_result: None,
1135        }
1136    }
1137
1138    /// Returns this value with its effect intent setting replaced. The
1139    /// method follows builder-style data construction and does not
1140    /// execute external work.
1141    pub fn with_effect_intent(mut self, effect_id: EffectId) -> Self {
1142        if let Some(kind) = self.requested_action.effect_kind_hint() {
1143            let mut intent = EffectIntent::new(
1144                effect_id.clone(),
1145                kind,
1146                self.rule_ref.clone(),
1147                SourceRef::with_kind(SourceKind::Sdk, "source.sdk.stream_rule"),
1148                format!(
1149                    "stream intervention {} requested for redacted match",
1150                    self.requested_action.action_kind()
1151                ),
1152            );
1153            intent.policy_refs = self.policy_refs.clone();
1154            self.effect_intent_ref = Some(effect_id);
1155            self.effect_intent = Some(intent);
1156        }
1157        self
1158    }
1159
1160    /// Returns this value with its effect result setting replaced. The
1161    /// method follows builder-style data construction and does not
1162    /// execute external work.
1163    pub fn with_effect_result(mut self, result: EffectResult) -> Self {
1164        self.effect_result_ref = Some(result.effect_id.clone());
1165        self.effect_result = Some(result);
1166        self
1167    }
1168
1169    /// Returns the effect kind name currently held by this value.
1170    /// This is data-only and does not perform I/O, call host ports, append journals, publish
1171    /// events, or start processes.
1172    pub fn effect_kind_name(&self) -> Option<&'static str> {
1173        self.effect_intent.as_ref().map(|intent| match intent.kind {
1174            EffectKind::ProviderRequest => "provider_request",
1175            EffectKind::ToolExecution => "tool_execution",
1176            EffectKind::ApprovalDispatch => "approval_dispatch",
1177            EffectKind::MemoryWrite => "memory_write",
1178            EffectKind::ExtensionAction => "extension_action",
1179            EffectKind::OutputDelivery => "output_delivery",
1180            EffectKind::FileWrite => "file_write",
1181            EffectKind::ProcessStart => "process_start",
1182            EffectKind::ProcessSignal => "process_signal",
1183            EffectKind::IsolatedProcessStart => "isolated_process_start",
1184            EffectKind::ChildAgentStart => "child_agent_start",
1185            EffectKind::RunMessageDelivery => "run_message_delivery",
1186            EffectKind::ChildArtifactShutdown => "child_artifact_shutdown",
1187            EffectKind::DetachTransfer => "detach_transfer",
1188            EffectKind::HookMutation => "hook_mutation",
1189        })
1190    }
1191}
1192
1193#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1194#[serde(rename_all = "snake_case")]
1195/// Enumerates the finite stream rule record kind cases.
1196/// Serialized names are part of the SDK contract; update fixtures when variants change.
1197pub enum StreamRuleRecordKind {
1198    /// Use this variant when the contract needs to represent registered; selecting it has no side effect by itself.
1199    Registered,
1200    /// Use this variant when the contract needs to represent compile failed; selecting it has no side effect by itself.
1201    CompileFailed,
1202    /// Use this variant when the contract needs to represent matched; selecting it has no side effect by itself.
1203    Matched,
1204    /// Use this variant when the contract needs to represent intervention intent; selecting it has no side effect by itself.
1205    InterventionIntent,
1206    /// Use this variant when the contract needs to represent intervention result; selecting it has no side effect by itself.
1207    InterventionResult,
1208    /// Use this variant when the contract needs to represent repeat state; selecting it has no side effect by itself.
1209    RepeatState,
1210}
1211
1212#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1213/// Carries the stream rule repeat state snapshot record payload for journal, event, or fixture surfaces.
1214/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1215pub struct StreamRuleRepeatStateSnapshot {
1216    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1217    /// Collection of seen match keys values.
1218    /// Ordering and membership should be treated as part of the serialized contract when
1219    /// relevant.
1220    pub seen_match_keys: Vec<String>,
1221}
1222
1223#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1224/// Carries the stream rule record record payload for journal, event, or fixture surfaces.
1225/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1226pub struct StreamRuleRecord {
1227    /// Kind discriminator for record kind.
1228    /// Use it to route finite match arms without parsing display text.
1229    pub record_kind: StreamRuleRecordKind,
1230    /// Stable rule id used for typed lineage, lookup, or dedupe.
1231    pub rule_id: StreamRuleId,
1232    /// Version string for this capability, package, or protocol surface.
1233    /// Use it for compatibility checks during package or adapter resolution.
1234    pub rule_version: RuleVersion,
1235    #[serde(skip_serializing_if = "Option::is_none")]
1236    /// Optional channel value.
1237    /// When absent, callers should use the documented default or skip that optional behavior.
1238    pub channel: Option<StreamChannel>,
1239    #[serde(skip_serializing_if = "Option::is_none")]
1240    /// Optional direction value.
1241    /// When absent, callers should use the documented default or skip that optional behavior.
1242    pub direction: Option<StreamDirection>,
1243    #[serde(skip_serializing_if = "Option::is_none")]
1244    /// Cursor identifying a replay, export, or subscription position.
1245    /// Use it to resume without widening the original scope.
1246    pub cursor: Option<StreamCursor>,
1247    #[serde(skip_serializing_if = "Option::is_none")]
1248    /// Optional redacted match value.
1249    /// When absent, callers should use the documented default or skip that optional behavior.
1250    pub redacted_match: Option<RedactedMatch>,
1251    #[serde(skip_serializing_if = "Option::is_none")]
1252    /// Optional intervention value.
1253    /// When absent, callers should use the documented default or skip that optional behavior.
1254    pub intervention: Option<StreamIntervention>,
1255    #[serde(skip_serializing_if = "Option::is_none")]
1256    /// Optional repeat state value.
1257    /// When absent, callers should use the documented default or skip that optional behavior.
1258    pub repeat_state: Option<StreamRuleRepeatStateSnapshot>,
1259    #[serde(default, skip_serializing_if = "Vec::is_empty")]
1260    /// Policy references that govern admission, projection, execution, or
1261    /// delivery.
1262    pub policy_refs: Vec<PolicyRef>,
1263    /// Redacted human-readable summary safe for events, telemetry, and logs.
1264    pub redacted_summary: String,
1265}
1266
1267impl StreamRuleRecord {
1268    /// Builds the matched value.
1269    /// This is data construction and performs no I/O, journal append, event publication, or
1270    /// process work.
1271    pub fn matched(rule: &StreamRule, intervention: &StreamIntervention) -> Self {
1272        Self {
1273            record_kind: StreamRuleRecordKind::Matched,
1274            rule_id: rule.id.clone(),
1275            rule_version: rule.version,
1276            channel: Some(intervention.redacted_match.channel.clone()),
1277            direction: intervention.redacted_match.direction.clone(),
1278            cursor: Some(intervention.redacted_match.cursor.clone()),
1279            redacted_match: Some(intervention.redacted_match.clone()),
1280            intervention: Some(intervention.clone()),
1281            repeat_state: None,
1282            policy_refs: rule.policy_refs.clone(),
1283            redacted_summary: "stream rule matched redacted content".to_string(),
1284        }
1285    }
1286
1287    /// Builds the repeat state value.
1288    /// This is data construction and performs no I/O, journal append, event publication, or
1289    /// process work.
1290    pub fn repeat_state(rule: &StreamRule, repeat_state: StreamRuleRepeatStateSnapshot) -> Self {
1291        Self {
1292            record_kind: StreamRuleRecordKind::RepeatState,
1293            rule_id: rule.id.clone(),
1294            rule_version: rule.version,
1295            channel: None,
1296            direction: None,
1297            cursor: None,
1298            redacted_match: None,
1299            intervention: None,
1300            repeat_state: Some(repeat_state),
1301            policy_refs: rule.policy_refs.clone(),
1302            redacted_summary: "stream rule repeat state snapshot".to_string(),
1303        }
1304    }
1305
1306    /// Converts this value into journal record data.
1307    /// This is data-only and does not perform I/O, call host ports, append journals, publish
1308    /// events, or start processes.
1309    pub fn to_journal_record(&self, base: JournalRecordBase) -> JournalRecord {
1310        JournalRecord::feature_record(
1311            base,
1312            JournalRecordKind::StreamRule,
1313            "stream_rule",
1314            self.event_kind_name(),
1315            EntityRef::new(EntityKind::StreamRule, self.rule_id.as_str()),
1316            Vec::new(),
1317            Vec::new(),
1318            JournalRecordPayload::StreamRule(self.clone()),
1319        )
1320    }
1321
1322    /// Returns the event kind name currently held by this value.
1323    /// This is data-only and does not perform I/O, call host ports, append journals, publish
1324    /// events, or start processes.
1325    pub fn event_kind_name(&self) -> &'static str {
1326        match self.record_kind {
1327            StreamRuleRecordKind::Registered => "stream_rule_registered",
1328            StreamRuleRecordKind::CompileFailed => "stream_rule_compile_failed",
1329            StreamRuleRecordKind::Matched => "stream_rule_matched",
1330            StreamRuleRecordKind::InterventionIntent => "stream_intervention_requested",
1331            StreamRuleRecordKind::InterventionResult => "stream_intervention_applied",
1332            StreamRuleRecordKind::RepeatState => "stream_rule_repeat_state_recorded",
1333        }
1334    }
1335}
1336
1337/// Validates the records::stream invariants and returns a typed error
1338/// on failure. Validation is pure and does not perform I/O, dispatch,
1339/// journal appends, or adapter calls.
1340pub(crate) fn validate_safe_regex(pattern: &str) -> Result<(), AgentError> {
1341    if pattern.is_empty() {
1342        return Err(AgentError::missing_required_field(
1343            "stream_matcher.regex.pattern",
1344        ));
1345    }
1346    if pattern.len() > 512 {
1347        return Err(AgentError::contract_violation(
1348            "stream regex pattern exceeds bounded length",
1349        ));
1350    }
1351    let forbidden = ["(?", "\\1", "\\2", "+)+", "*)+", "++", "**", "{0,"];
1352    if forbidden.iter().any(|needle| pattern.contains(needle)) {
1353        return Err(AgentError::contract_violation(
1354            "stream regex pattern uses unsupported or backtracking-prone syntax",
1355        ));
1356    }
1357    Ok(())
1358}
1359
1360fn validate_window(window_bytes: u64) -> Result<(), AgentError> {
1361    if window_bytes == 0 || window_bytes > 64 * 1024 {
1362        return Err(AgentError::contract_violation(
1363            "stream matcher window_bytes must be between 1 and 65536",
1364        ));
1365    }
1366    Ok(())
1367}
1368
1369/// Computes the stable hash rule fingerprint for this records::stream
1370/// value. The computation is deterministic and side-effect free so it
1371/// can be used in package, journal, or test evidence.
1372pub(crate) fn hash_rule_fingerprint(rule: &StreamRule) -> Result<String, AgentError> {
1373    let bytes = serde_json::to_vec(rule)
1374        .map_err(|error| AgentError::contract_violation(error.to_string()))?;
1375    Ok(format!("sha256:{:x}", Sha256::digest(bytes)))
1376}
1377
1378/// Builds the safe id fragment value.
1379/// This is data construction and performs no I/O, journal append, event publication, or process
1380pub(crate) fn safe_id_fragment(value: &str) -> String {
1381    value
1382        .chars()
1383        .map(|character| {
1384            if character.is_ascii_alphanumeric() || character == '.' || character == '_' {
1385                character
1386            } else {
1387                '.'
1388            }
1389        })
1390        .collect()
1391}
1392
1393/// Returns stream policy ref derived from the supplied state.
1394/// This is data-only and does not perform I/O, call host ports, append journals, publish
1395/// events, or start processes.
1396pub fn stream_policy_ref(id: impl Into<String>) -> PolicyRef {
1397    PolicyRef::with_kind(PolicyKind::RuntimePackage, id)
1398}