agent_sdk_core/records/stream.rs
1//! Durable and observable SDK records. Use these DTOs for events, journals, effects,
2//! context, output, and feature evidence. Constructing records is data-only;
3//! persistence, publication, and external actions happen through ports or application
4//! coordinators. This file contains the stream portion of that contract.
5//!
6use core::fmt;
7
8use serde::{Deserialize, Deserializer, Serialize, de::Error as DeError};
9use sha2::{Digest, Sha256};
10
11use crate::{
12 domain::{
13 AgentError, AgentId, ContentRef, DestinationRef, EffectId, EntityKind, EntityRef,
14 IdValidationError, MessageId, PolicyKind, PolicyRef, PrivacyClass, RetentionClass, RunId,
15 SourceKind, SourceRef, ToolCallId, TurnId,
16 },
17 effect::{EffectIntent, EffectKind, EffectResult},
18 ids::{AttemptId, validate_identifier},
19 journal::{JournalRecord, JournalRecordBase, JournalRecordKind, JournalRecordPayload},
20};
21
22macro_rules! typed_stream_id {
23 ($name:ident) => {
24 #[doc = concat!(
25 "Typed stream identifier for `",
26 stringify!($name),
27 "`. Use it to correlate deltas, rules, matchers, and interventions; ",
28 "constructing it is data-only and performs no side effects."
29 )]
30 #[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
31 #[serde(transparent)]
32 pub struct $name(String);
33
34 impl $name {
35 /// Creates a new records::stream value with explicit
36 /// caller-provided inputs. This constructor is data-only
37 /// and performs no I/O or external side effects.
38 ///
39 /// # Panics
40 ///
41 /// Panics if constructor invariants fail, such as invalid identifier
42 /// text or constructor-specific bounds. Use a fallible constructor such as
43 /// `try_new` when one is available for untrusted input.
44 pub fn new(value: impl Into<String>) -> Self {
45 Self::try_new(value).expect(concat!(stringify!($name), " must be valid"))
46 }
47
48 /// Creates a new records::stream value after validation.
49 /// Returns an SDK error instead of panicking when the
50 /// identifier or input does not satisfy the contract.
51 pub fn try_new(value: impl Into<String>) -> Result<Self, IdValidationError> {
52 let value = value.into();
53 validate_identifier(&value)?;
54 Ok(Self(value))
55 }
56
57 /// Returns this value as str. The accessor is side-effect
58 /// free and keeps ownership with the caller.
59 pub fn as_str(&self) -> &str {
60 &self.0
61 }
62 }
63
64 impl From<&str> for $name {
65 fn from(value: &str) -> Self {
66 Self::new(value)
67 }
68 }
69
70 impl<'de> Deserialize<'de> for $name {
71 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
72 where
73 D: Deserializer<'de>,
74 {
75 let value = String::deserialize(deserializer)?;
76 Self::try_new(value).map_err(D::Error::custom)
77 }
78 }
79
80 impl fmt::Debug for $name {
81 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
82 formatter.write_str(concat!(stringify!($name), "(redacted)"))
83 }
84 }
85
86 impl fmt::Display for $name {
87 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
88 formatter.write_str(concat!(stringify!($name), "(redacted)"))
89 }
90 }
91 };
92}
93
94typed_stream_id!(StreamDeltaId);
95typed_stream_id!(StreamRuleId);
96typed_stream_id!(StreamInterventionId);
97typed_stream_id!(StreamMatchId);
98typed_stream_id!(MarkerId);
99typed_stream_id!(MarkerVersion);
100typed_stream_id!(MatcherEngineRef);
101
102#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
103#[serde(transparent)]
104/// Carries the rule version record payload for journal, event, or fixture surfaces.
105/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
106pub struct RuleVersion(pub u32);
107
108impl RuleVersion {
109 /// Creates a new records::stream value with explicit
110 /// caller-provided inputs. This constructor is data-only and
111 /// performs no I/O or external side effects.
112 pub fn new(version: u32) -> Self {
113 assert!(version > 0, "RuleVersion must be nonzero");
114 Self(version)
115 }
116}
117
118#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
119#[serde(rename_all = "snake_case")]
120/// Enumerates the finite stream channel cases.
121/// Serialized names are part of the SDK contract; update fixtures when variants change.
122pub enum StreamChannel {
123 /// Use this variant when the contract needs to represent assistant text; selecting it has no side effect by itself.
124 AssistantText,
125 /// Use this variant when the contract needs to represent reasoning summary; selecting it has no side effect by itself.
126 ReasoningSummary,
127 /// Use this variant when the contract needs to represent provider exposed reasoning; selecting it has no side effect by itself.
128 ProviderExposedReasoning,
129 /// Use this variant when the contract needs to represent tool call arguments; selecting it has no side effect by itself.
130 ToolCallArguments,
131 /// Use this variant when the contract needs to represent tool result text; selecting it has no side effect by itself.
132 ToolResultText,
133 /// Use this variant when the contract needs to represent realtime transcript; selecting it has no side effect by itself.
134 RealtimeTranscript,
135 /// Use this variant when the contract needs to represent realtime media; selecting it has no side effect by itself.
136 RealtimeMedia,
137 /// Use this variant when the contract needs to represent hidden chain of thought; selecting it has no side effect by itself.
138 HiddenChainOfThought,
139}
140
141impl StreamChannel {
142 /// Reports whether this value is policy visible. The check is pure
143 /// and does not mutate SDK or host state.
144 pub fn is_policy_visible(&self) -> bool {
145 !matches!(self, Self::HiddenChainOfThought)
146 }
147
148 /// Returns this value as contract name. The accessor is side-effect
149 /// free and keeps ownership with the caller.
150 pub fn as_contract_name(&self) -> &'static str {
151 match self {
152 Self::AssistantText => "assistant_text",
153 Self::ReasoningSummary => "reasoning_summary",
154 Self::ProviderExposedReasoning => "provider_exposed_reasoning",
155 Self::ToolCallArguments => "tool_call_arguments",
156 Self::ToolResultText => "tool_result_text",
157 Self::RealtimeTranscript => "realtime_transcript",
158 Self::RealtimeMedia => "realtime_media",
159 Self::HiddenChainOfThought => "hidden_chain_of_thought",
160 }
161 }
162}
163
164#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
165#[serde(rename_all = "snake_case")]
166/// Enumerates the finite stream direction cases.
167/// Serialized names are part of the SDK contract; update fixtures when variants change.
168pub enum StreamDirection {
169 /// Use this variant when the contract needs to represent input to provider; selecting it has no side effect by itself.
170 InputToProvider,
171 /// Use this variant when the contract needs to represent output from provider; selecting it has no side effect by itself.
172 OutputFromProvider,
173}
174
175#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
176#[serde(rename_all = "snake_case")]
177/// Enumerates the finite stream cursor precision cases.
178/// Serialized names are part of the SDK contract; update fixtures when variants change.
179pub enum StreamCursorPrecision {
180 /// Use this variant when the contract needs to represent byte offset; selecting it has no side effect by itself.
181 ByteOffset,
182 /// Use this variant when the contract needs to represent chunk sequence only; selecting it has no side effect by itself.
183 ChunkSequenceOnly,
184 /// Use this variant when the contract needs to represent marker; selecting it has no side effect by itself.
185 Marker,
186}
187
188#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
189/// Carries the stream cursor record payload for journal, event, or fixture surfaces.
190/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
191pub struct StreamCursor {
192 /// Chunk sequence used by this record or request.
193 pub chunk_sequence: u64,
194 /// Byte size or byte limit for byte offset.
195 /// Use it to enforce bounded reads, writes, summaries, or parser output.
196 pub byte_offset: u64,
197 /// Precision used by this record or request.
198 pub precision: StreamCursorPrecision,
199 #[serde(skip_serializing_if = "Option::is_none")]
200 /// Optional label value.
201 /// When absent, callers should use the documented default or skip that optional behavior.
202 pub label: Option<String>,
203}
204
205impl StreamCursor {
206 /// Builds the chunk value with the documented defaults.
207 /// This is data-only and does not perform I/O, call host ports, append journals, publish
208 /// events, or start processes.
209 pub fn chunk(chunk_sequence: u64) -> Self {
210 Self {
211 chunk_sequence,
212 byte_offset: 0,
213 precision: StreamCursorPrecision::ChunkSequenceOnly,
214 label: None,
215 }
216 }
217
218 /// Builds the byte value with the documented defaults.
219 /// This is data-only and does not perform I/O, call host ports, append journals, publish
220 /// events, or start processes.
221 pub fn byte(chunk_sequence: u64, byte_offset: u64) -> Self {
222 Self {
223 chunk_sequence,
224 byte_offset,
225 precision: StreamCursorPrecision::ByteOffset,
226 label: None,
227 }
228 }
229
230 /// Marker.
231 /// This is data-only and does not perform I/O, call host ports, append journals, publish
232 /// events, or start processes.
233 pub fn marker(chunk_sequence: u64, label: impl Into<String>) -> Self {
234 Self {
235 chunk_sequence,
236 byte_offset: 0,
237 precision: StreamCursorPrecision::Marker,
238 label: Some(label.into()),
239 }
240 }
241}
242
243#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
244/// Carries the stream delta record payload for journal, event, or fixture surfaces.
245/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
246pub struct StreamDelta {
247 /// Stable delta id used for typed lineage, lookup, or dedupe.
248 pub delta_id: StreamDeltaId,
249 /// Run identifier used for lineage, filtering, replay, and dedupe.
250 pub run_id: RunId,
251 /// Agent identifier used for lineage, filtering, and ownership checks.
252 pub agent_id: AgentId,
253 #[serde(skip_serializing_if = "Option::is_none")]
254 /// Turn identifier for one loop turn within a run.
255 pub turn_id: Option<TurnId>,
256 #[serde(skip_serializing_if = "Option::is_none")]
257 /// Attempt identifier for retry, repair, provider, or tool execution
258 /// evidence.
259 pub attempt_id: Option<AttemptId>,
260 #[serde(skip_serializing_if = "Option::is_none")]
261 /// Message identifier for transcript, projection, or provider-response
262 /// lineage.
263 pub message_id: Option<MessageId>,
264 #[serde(skip_serializing_if = "Option::is_none")]
265 /// Stable tool call id used for typed lineage, lookup, or dedupe.
266 pub tool_call_id: Option<ToolCallId>,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 /// Stable realtime session id used for typed lineage, lookup, or dedupe.
269 pub realtime_session_id: Option<crate::realtime_records::RealtimeSessionId>,
270 /// Channel used by this record or request.
271 pub channel: StreamChannel,
272 #[serde(skip_serializing_if = "Option::is_none")]
273 /// Optional direction value.
274 /// When absent, callers should use the documented default or skip that optional behavior.
275 pub direction: Option<StreamDirection>,
276 /// Cursor identifying a replay, export, or subscription position.
277 /// Use it to resume without widening the original scope.
278 pub cursor: StreamCursor,
279 /// Source label or ref for this item; it is metadata and does not fetch
280 /// content by itself.
281 pub source: SourceRef,
282 #[serde(skip_serializing_if = "Option::is_none")]
283 /// Destination label or ref for this item; it is metadata and does not
284 /// deliver content by itself.
285 pub destination: Option<DestinationRef>,
286 #[serde(default, skip_serializing_if = "Vec::is_empty")]
287 /// Policy references that govern admission, projection, execution, or
288 /// delivery.
289 pub policy_refs: Vec<PolicyRef>,
290 /// Privacy class used for projection, telemetry, and raw-content access
291 /// decisions.
292 pub privacy: PrivacyClass,
293 /// Retention class used by hosts and sinks when storing or exporting this
294 /// item.
295 pub retention: RetentionClass,
296 #[serde(skip_serializing_if = "Option::is_none")]
297 /// Content reference where payload bytes or structured tool output are
298 /// stored.
299 pub content_ref: Option<ContentRef>,
300 #[serde(skip_serializing_if = "Option::is_none")]
301 /// Stable marker id used for typed lineage, lookup, or dedupe.
302 pub marker_id: Option<MarkerId>,
303 /// Redacted human-readable summary safe for events, telemetry, and logs.
304 pub redacted_summary: String,
305 /// Fingerprint of the runtime package snapshot in force when this value was produced.
306 /// Use it for replay, dedupe, and package-lineage checks; the field is evidence and does
307 /// not execute package behavior.
308 pub runtime_package_fingerprint: String,
309 #[serde(skip)]
310 match_text: Option<String>,
311}
312
313impl StreamDelta {
314 /// Builds the visible text value with the documented defaults.
315 /// This is data-only and does not perform I/O, call host ports, append journals, publish
316 /// events, or start processes.
317 pub fn visible_text(
318 delta_id: impl Into<String>,
319 channel: StreamChannel,
320 cursor: StreamCursor,
321 text: impl Into<String>,
322 source: SourceRef,
323 ) -> Self {
324 let text = text.into();
325 Self {
326 delta_id: StreamDeltaId::new(delta_id),
327 run_id: RunId::new("run.stream.delta.unset"),
328 agent_id: AgentId::new("agent.stream.delta.unset"),
329 turn_id: None,
330 attempt_id: None,
331 message_id: None,
332 tool_call_id: None,
333 realtime_session_id: None,
334 channel,
335 direction: None,
336 cursor,
337 source,
338 destination: None,
339 policy_refs: Vec::new(),
340 privacy: PrivacyClass::Public,
341 retention: RetentionClass::RunScoped,
342 content_ref: None,
343 marker_id: None,
344 redacted_summary: format!("{} bytes visible stream text", text.len()),
345 runtime_package_fingerprint: "runtime.package.fingerprint.test".to_string(),
346 match_text: Some(text),
347 }
348 }
349
350 /// Marker.
351 /// This is data-only and does not perform I/O, call host ports, append journals, publish
352 /// events, or start processes.
353 pub fn marker(
354 delta_id: impl Into<String>,
355 channel: StreamChannel,
356 cursor: StreamCursor,
357 marker_id: impl Into<String>,
358 source: SourceRef,
359 ) -> Self {
360 let marker_id = MarkerId::new(marker_id);
361 Self {
362 delta_id: StreamDeltaId::new(delta_id),
363 run_id: RunId::new("run.stream.delta.unset"),
364 agent_id: AgentId::new("agent.stream.delta.unset"),
365 turn_id: None,
366 attempt_id: None,
367 message_id: None,
368 tool_call_id: None,
369 realtime_session_id: None,
370 channel,
371 direction: None,
372 cursor,
373 source,
374 destination: None,
375 policy_refs: Vec::new(),
376 privacy: PrivacyClass::Internal,
377 retention: RetentionClass::RunScoped,
378 content_ref: None,
379 marker_id: Some(marker_id),
380 redacted_summary: "typed stream marker".to_string(),
381 runtime_package_fingerprint: "runtime.package.fingerprint.test".to_string(),
382 match_text: None,
383 }
384 }
385
386 /// Returns this value with its run setting replaced. The method
387 /// follows builder-style data construction and does not execute
388 /// external work.
389 pub fn with_run(mut self, run_id: RunId, agent_id: AgentId) -> Self {
390 self.run_id = run_id;
391 self.agent_id = agent_id;
392 self
393 }
394
395 /// Returns this value with its turn setting replaced. The method
396 /// follows builder-style data construction and does not execute
397 /// external work.
398 pub fn with_turn(mut self, turn_id: TurnId) -> Self {
399 self.turn_id = Some(turn_id);
400 self
401 }
402
403 /// Returns this value with its attempt setting replaced. The method
404 /// follows builder-style data construction and does not execute
405 /// external work.
406 pub fn with_attempt(mut self, attempt_id: AttemptId) -> Self {
407 self.attempt_id = Some(attempt_id);
408 self
409 }
410
411 /// Returns this value with its direction setting replaced. The
412 /// method follows builder-style data construction and does not
413 /// execute external work.
414 pub fn with_direction(mut self, direction: StreamDirection) -> Self {
415 self.direction = Some(direction);
416 self
417 }
418
419 /// Returns this value with its destination setting replaced. The
420 /// method follows builder-style data construction and does not
421 /// execute external work.
422 pub fn with_destination(mut self, destination: DestinationRef) -> Self {
423 self.destination = Some(destination);
424 self
425 }
426
427 /// Returns this value with its content ref setting replaced. The
428 /// method follows builder-style data construction and does not
429 /// execute external work.
430 pub fn with_content_ref(mut self, content_ref: ContentRef) -> Self {
431 self.content_ref = Some(content_ref);
432 self
433 }
434
435 /// Returns the matcher text currently held by this value.
436 /// This is data-only and does not perform I/O, call host ports, append journals, publish
437 /// events, or start processes.
438 pub fn matcher_text(&self) -> Option<&str> {
439 if !self.channel.is_policy_visible() || self.privacy == PrivacyClass::Secret {
440 return None;
441 }
442 self.match_text.as_deref()
443 }
444
445 /// Returns whether serialized raw text absent applies for this contract.
446 /// This is data-only and does not perform I/O, call host ports, append journals, publish
447 /// events, or start processes.
448 pub fn serialized_raw_text_absent(&self) -> bool {
449 serde_json::to_string(self)
450 .map(|json| {
451 self.match_text
452 .as_ref()
453 .is_none_or(|text| !json.contains(text))
454 })
455 .unwrap_or(false)
456 }
457}
458
459#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
460/// Carries the stream channel selector record payload for journal, event, or fixture surfaces.
461/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
462pub struct StreamChannelSelector {
463 /// Channel used by this record or request.
464 pub channel: StreamChannel,
465 #[serde(skip_serializing_if = "Option::is_none")]
466 /// Optional direction value.
467 /// When absent, callers should use the documented default or skip that optional behavior.
468 pub direction: Option<StreamDirection>,
469}
470
471impl StreamChannelSelector {
472 /// Builds the channel value with the documented defaults.
473 /// This is data-only and does not perform I/O, call host ports, append journals, publish
474 /// events, or start processes.
475 pub fn channel(channel: StreamChannel) -> Self {
476 Self {
477 channel,
478 direction: None,
479 }
480 }
481
482 /// Returns matches for the current value.
483 /// This is a read-only or data-construction helper unless the method body explicitly calls
484 /// a port or store.
485 pub fn matches(&self, delta: &StreamDelta) -> bool {
486 self.channel == delta.channel
487 && self
488 .direction
489 .as_ref()
490 .is_none_or(|direction| Some(direction) == delta.direction.as_ref())
491 }
492}
493
494#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
495#[serde(rename_all = "snake_case")]
496/// Enumerates the finite regex dialect cases.
497/// Serialized names are part of the SDK contract; update fixtures when variants change.
498pub enum RegexDialect {
499 /// Use this variant when the contract needs to represent safe subset; selecting it has no side effect by itself.
500 SafeSubset,
501}
502
503#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
504#[serde(tag = "kind", rename_all = "snake_case")]
505/// Enumerates the finite stream matcher cases.
506/// Serialized names are part of the SDK contract; update fixtures when variants change.
507pub enum StreamMatcher {
508 /// Use this variant when the contract needs to represent literal; selecting it has no side effect by itself.
509 Literal {
510 /// Text used by this record or request.
511 text: String,
512 /// Whether case sensitive is enabled.
513 /// Policy, validation, or routing code uses this flag to choose the explicit behavior.
514 case_sensitive: bool,
515 /// window bytes used for bounds checks, summaries, or truncation
516 /// evidence.
517 window_bytes: u64,
518 },
519 /// Use this variant when the contract needs to represent regex; selecting it has no side effect by itself.
520 Regex {
521 /// Search pattern supplied by the caller.
522 /// The grep executor compiles it under regex and output bounds before reading files.
523 pattern: String,
524 /// Schema dialect used to interpret the output schema.
525 /// Validators use it to select the supported JSON-schema subset and compatibility
526 /// rules.
527 dialect: RegexDialect,
528 /// window bytes used for bounds checks, summaries, or truncation
529 /// evidence.
530 window_bytes: u64,
531 /// Timeout budget in milliseconds for the requested operation.
532 timeout_ms: u64,
533 },
534 /// Use this variant when the contract needs to represent marker; selecting it has no side effect by itself.
535 Marker {
536 /// Stable marker id used for typed lineage, lookup, or dedupe.
537 marker_id: MarkerId,
538 /// Version string for this capability, package, or protocol surface.
539 /// Use it for compatibility checks during package or adapter resolution.
540 marker_version: MarkerVersion,
541 },
542 /// Use this variant when the contract needs to represent host matcher; selecting it has no side effect by itself.
543 HostMatcher {
544 /// Typed matcher ref reference. Resolving or executing it is a
545 /// separate policy-gated step.
546 matcher_ref: MatcherEngineRef,
547 #[serde(skip_serializing_if = "Option::is_none")]
548 /// Typed risk policy ref reference. Resolving or executing it is a
549 /// separate policy-gated step.
550 risk_policy_ref: Option<PolicyRef>,
551 },
552}
553
554impl StreamMatcher {
555 /// Builds the literal value with the documented defaults.
556 /// This is data-only and does not perform I/O, call host ports, append journals, publish
557 /// events, or start processes.
558 pub fn literal(text: impl Into<String>, case_sensitive: bool, window_bytes: u64) -> Self {
559 Self::Literal {
560 text: text.into(),
561 case_sensitive,
562 window_bytes,
563 }
564 }
565
566 /// Builds the regex with limits value with the documented defaults.
567 /// This is data-only and does not perform I/O, call host ports, append journals, publish
568 /// events, or start processes.
569 pub fn regex_with_limits(
570 pattern: impl Into<String>,
571 window_bytes: u64,
572 timeout_ms: u64,
573 ) -> Self {
574 Self::Regex {
575 pattern: pattern.into(),
576 dialect: RegexDialect::SafeSubset,
577 window_bytes,
578 timeout_ms,
579 }
580 }
581
582 /// Marker.
583 /// This is data-only and does not perform I/O, call host ports, append journals, publish
584 /// events, or start processes.
585 pub fn marker(marker_id: impl Into<String>) -> Self {
586 Self::Marker {
587 marker_id: MarkerId::new(marker_id),
588 marker_version: MarkerVersion::new("marker.v1"),
589 }
590 }
591
592 /// Builds the window bytes value with the documented defaults.
593 /// This is data-only and does not perform I/O, call host ports, append journals, publish
594 /// events, or start processes.
595 pub fn window_bytes(&self) -> u64 {
596 match self {
597 Self::Literal { window_bytes, .. } | Self::Regex { window_bytes, .. } => *window_bytes,
598 Self::Marker { .. } | Self::HostMatcher { .. } => 0,
599 }
600 }
601
602 /// Returns the kind name currently held by this value.
603 /// This is data-only and does not perform I/O, call host ports, append journals, publish
604 /// events, or start processes.
605 pub fn kind_name(&self) -> &'static str {
606 match self {
607 Self::Literal { .. } => "literal",
608 Self::Regex { .. } => "regex",
609 Self::Marker { .. } => "marker",
610 Self::HostMatcher { .. } => "host_matcher",
611 }
612 }
613
614 /// Validates the records::stream invariants and returns a typed
615 /// error on failure. Validation is pure and does not perform I/O,
616 /// dispatch, journal appends, or adapter calls.
617 pub fn validate(&self) -> Result<(), AgentError> {
618 match self {
619 Self::Literal {
620 text, window_bytes, ..
621 } => {
622 if text.is_empty() {
623 return Err(AgentError::missing_required_field(
624 "stream_matcher.literal.text",
625 ));
626 }
627 validate_window(*window_bytes)
628 }
629 Self::Regex {
630 pattern,
631 window_bytes,
632 timeout_ms,
633 ..
634 } => {
635 validate_window(*window_bytes)?;
636 if *timeout_ms == 0 || *timeout_ms > 1000 {
637 return Err(AgentError::contract_violation(
638 "stream regex timeout must be between 1 and 1000ms",
639 ));
640 }
641 validate_safe_regex(pattern)
642 }
643 Self::Marker { marker_id, .. } => {
644 if marker_id.as_str().is_empty() {
645 return Err(AgentError::missing_required_field(
646 "stream_matcher.marker.marker_id",
647 ));
648 }
649 Ok(())
650 }
651 Self::HostMatcher {
652 risk_policy_ref, ..
653 } => {
654 if risk_policy_ref.is_none() {
655 return Err(AgentError::contract_violation(
656 "host stream matcher requires declared risk policy ref",
657 ));
658 }
659 Ok(())
660 }
661 }
662 }
663}
664
665#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
666#[serde(rename_all = "snake_case")]
667/// Enumerates the finite stream rule scope cases.
668/// Serialized names are part of the SDK contract; update fixtures when variants change.
669pub enum StreamRuleScope {
670 /// Use this variant when the contract needs to represent run; selecting it has no side effect by itself.
671 Run,
672 /// Use this variant when the contract needs to represent turn; selecting it has no side effect by itself.
673 Turn,
674 /// Use this variant when the contract needs to represent attempt; selecting it has no side effect by itself.
675 Attempt,
676 /// Use this variant when the contract needs to represent realtime session; selecting it has no side effect by itself.
677 RealtimeSession,
678}
679
680#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
681#[serde(rename_all = "snake_case")]
682/// Enumerates the finite partial output policy cases.
683/// Serialized names are part of the SDK contract; update fixtures when variants change.
684pub enum PartialOutputPolicy {
685 /// Use this variant when the contract needs to represent keep; selecting it has no side effect by itself.
686 Keep,
687 /// Use this variant when the contract needs to represent discard; selecting it has no side effect by itself.
688 Discard,
689 /// Use this variant when the contract needs to represent mask; selecting it has no side effect by itself.
690 Mask,
691 /// Use this variant when the contract needs to represent content ref only; selecting it has no side effect by itself.
692 ContentRefOnly,
693}
694
695#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
696#[serde(rename_all = "snake_case")]
697/// Enumerates the finite repeat policy cases.
698/// Serialized names are part of the SDK contract; update fixtures when variants change.
699pub enum RepeatPolicy {
700 /// Use this variant when the contract needs to represent always; selecting it has no side effect by itself.
701 Always,
702 /// Use this variant when the contract needs to represent once per run; selecting it has no side effect by itself.
703 OncePerRun,
704 /// Use this variant when the contract needs to represent once per turn; selecting it has no side effect by itself.
705 OncePerTurn,
706 /// Use this variant when the contract needs to represent once per attempt and span; selecting it has no side effect by itself.
707 OncePerAttemptAndSpan,
708}
709
710#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
711#[serde(rename_all = "snake_case")]
712/// Enumerates the finite match privacy policy cases.
713/// Serialized names are part of the SDK contract; update fixtures when variants change.
714pub enum MatchPrivacyPolicy {
715 /// Use this variant when the contract needs to represent hash length and summary; selecting it has no side effect by itself.
716 HashLengthAndSummary,
717 /// Use this variant when the contract needs to represent raw capture if policy allows; selecting it has no side effect by itself.
718 RawCaptureIfPolicyAllows,
719}
720
721#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
722#[serde(tag = "action", rename_all = "snake_case")]
723/// Enumerates the finite stream action cases.
724/// Serialized names are part of the SDK contract; update fixtures when variants change.
725pub enum StreamAction {
726 /// Use this variant when the contract needs to represent stop run; selecting it has no side effect by itself.
727 StopRun {
728 /// Redacted explanation for a denial, failure, status, or package
729 /// delta.
730 reason: String,
731 /// Partial output used by this record or request.
732 partial_output: PartialOutputPolicy,
733 },
734 /// Use this variant when the contract needs to represent abort and retry; selecting it has no side effect by itself.
735 AbortAndRetry {
736 /// Redacted summary for display, logs, events, or telemetry.
737 /// It should describe the value without exposing raw private content.
738 injection_summary: String,
739 /// Typed retry policy ref reference. Resolving or executing it is a
740 /// separate policy-gated step.
741 retry_policy_ref: PolicyRef,
742 /// Partial output used by this record or request.
743 partial_output: PartialOutputPolicy,
744 },
745 /// Use this variant when the contract needs to represent pause for approval; selecting it has no side effect by itself.
746 PauseForApproval {
747 /// Typed approval policy ref reference. Resolving or executing it is
748 /// a separate policy-gated step.
749 approval_policy_ref: PolicyRef,
750 /// Typed resume policy ref reference. Resolving or executing it is a
751 /// separate policy-gated step.
752 resume_policy_ref: PolicyRef,
753 },
754 /// Use this variant when the contract needs to represent mask and continue; selecting it has no side effect by itself.
755 MaskAndContinue {
756 /// Replacement used by this record or request.
757 replacement: String,
758 },
759 /// Use this variant when the contract needs to represent emit only; selecting it has no side effect by itself.
760 EmitOnly {
761 /// Kind discriminator for notice kind.
762 /// Use it to route finite match arms without parsing display text.
763 notice_kind: String,
764 },
765}
766
767impl StreamAction {
768 /// Returns an updated value with mask and continue configured.
769 /// This is data-only and does not perform I/O, call host ports, append journals, publish
770 /// events, or start processes.
771 pub fn mask_and_continue(replacement: impl Into<String>) -> Self {
772 Self::MaskAndContinue {
773 replacement: replacement.into(),
774 }
775 }
776
777 /// Returns an updated value with abort and retry configured.
778 /// This is data-only and does not perform I/O, call host ports, append journals, publish
779 /// events, or start processes.
780 pub fn abort_and_retry(
781 injection_summary: impl Into<String>,
782 retry_policy_ref: PolicyRef,
783 ) -> Self {
784 Self::AbortAndRetry {
785 injection_summary: injection_summary.into(),
786 retry_policy_ref,
787 partial_output: PartialOutputPolicy::Discard,
788 }
789 }
790
791 /// Builds the emit only value with the documented defaults.
792 /// This is data-only and does not perform I/O, call host ports, append journals, publish
793 /// events, or start processes.
794 pub fn emit_only(notice_kind: impl Into<String>) -> Self {
795 Self::EmitOnly {
796 notice_kind: notice_kind.into(),
797 }
798 }
799
800 /// Returns the action kind currently held by this value.
801 /// This is data-only and does not perform I/O, call host ports, append journals, publish
802 /// events, or start processes.
803 pub fn action_kind(&self) -> &'static str {
804 match self {
805 Self::StopRun { .. } => "stop_run",
806 Self::AbortAndRetry { .. } => "abort_and_retry",
807 Self::PauseForApproval { .. } => "pause_for_approval",
808 Self::MaskAndContinue { .. } => "mask_and_continue",
809 Self::EmitOnly { .. } => "emit_only",
810 }
811 }
812
813 /// Builds the partial output policy value.
814 /// This is data construction and performs no I/O, journal append, event publication, or
815 /// process work.
816 pub fn partial_output_policy(&self) -> PartialOutputPolicy {
817 match self {
818 Self::StopRun { partial_output, .. } | Self::AbortAndRetry { partial_output, .. } => {
819 partial_output.clone()
820 }
821 Self::PauseForApproval { .. } | Self::EmitOnly { .. } => PartialOutputPolicy::Keep,
822 Self::MaskAndContinue { .. } => PartialOutputPolicy::Mask,
823 }
824 }
825
826 /// Returns the effect kind hint currently held by this value.
827 /// This is data-only and does not perform I/O, call host ports, append journals, publish
828 /// events, or start processes.
829 pub fn effect_kind_hint(&self) -> Option<EffectKind> {
830 match self {
831 Self::AbortAndRetry { .. } => Some(EffectKind::ProviderRequest),
832 Self::PauseForApproval { .. } => Some(EffectKind::ApprovalDispatch),
833 Self::StopRun { .. } | Self::MaskAndContinue { .. } | Self::EmitOnly { .. } => None,
834 }
835 }
836}
837
838#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
839/// Carries the stream rule record payload for journal, event, or fixture surfaces.
840/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
841pub struct StreamRule {
842 /// Stable identifier for this record.
843 pub id: StreamRuleId,
844 /// Version string for this capability, package, or protocol surface.
845 /// Use it for compatibility checks during package or adapter resolution.
846 pub version: RuleVersion,
847 /// Source label or ref for this item; it is metadata and does not fetch
848 /// content by itself.
849 pub source: SourceRef,
850 /// Matcher used by this record or request.
851 pub matcher: StreamMatcher,
852 /// Collection of channels values.
853 /// Ordering and membership should be treated as part of the serialized contract when
854 /// relevant.
855 pub channels: Vec<StreamChannelSelector>,
856 /// Scope used by this record or request.
857 pub scope: StreamRuleScope,
858 /// Action used by this record or request.
859 pub action: StreamAction,
860 /// Repeat used by this record or request.
861 pub repeat: RepeatPolicy,
862 /// Privacy class used for projection, telemetry, and raw-content access
863 /// decisions.
864 pub privacy: MatchPrivacyPolicy,
865 #[serde(default, skip_serializing_if = "Vec::is_empty")]
866 /// Policy references that govern admission, projection, execution, or
867 /// delivery.
868 pub policy_refs: Vec<PolicyRef>,
869}
870
871impl StreamRule {
872 /// Starts a builder for this records::stream value. Building is
873 /// data-only; runtime side effects occur only when a later
874 /// coordinator or host port executes the built configuration.
875 pub fn builder(id: StreamRuleId) -> StreamRuleBuilder {
876 StreamRuleBuilder::new(id)
877 }
878
879 /// Returns an updated value with mask regex configured.
880 /// This is data-only and does not perform I/O, call host ports, append journals, publish
881 /// events, or start processes.
882 pub fn mask_regex(id: impl Into<String>, pattern: impl Into<String>) -> StreamRuleBuilder {
883 StreamRuleBuilder::new(StreamRuleId::new(id))
884 .source(SourceRef::with_kind(
885 SourceKind::Host,
886 "source.host.stream_rules",
887 ))
888 .matcher(StreamMatcher::regex_with_limits(pattern, 4096, 25))
889 .action(StreamAction::mask_and_continue("[redacted]"))
890 }
891
892 /// Validates the records::stream invariants and returns a typed
893 /// error on failure. Validation is pure and does not perform I/O,
894 /// dispatch, journal appends, or adapter calls.
895 pub fn validate(&self) -> Result<(), AgentError> {
896 self.matcher.validate()?;
897 if self.channels.is_empty() {
898 return Err(AgentError::missing_required_field("stream_rule.channels"));
899 }
900 if self.policy_refs.is_empty() {
901 return Err(AgentError::missing_required_field(
902 "stream_rule.policy_refs",
903 ));
904 }
905 Ok(())
906 }
907}
908
909#[derive(Clone, Debug)]
910/// Carries the stream rule builder record payload for journal, event, or fixture surfaces.
911/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
912pub struct StreamRuleBuilder {
913 rule: StreamRule,
914}
915
916impl StreamRuleBuilder {
917 /// Creates a new records::stream value with explicit
918 /// caller-provided inputs. This constructor is data-only and
919 /// performs no I/O or external side effects.
920 pub fn new(id: StreamRuleId) -> Self {
921 Self {
922 rule: StreamRule {
923 id,
924 version: RuleVersion::new(1),
925 source: SourceRef::with_kind(SourceKind::Host, "source.host.stream_rules"),
926 matcher: StreamMatcher::literal("unset", true, 64),
927 channels: Vec::new(),
928 scope: StreamRuleScope::Run,
929 action: StreamAction::emit_only("notice"),
930 repeat: RepeatPolicy::OncePerAttemptAndSpan,
931 privacy: MatchPrivacyPolicy::HashLengthAndSummary,
932 policy_refs: Vec::new(),
933 },
934 }
935 }
936
937 /// Returns an updated records::stream value with source applied. This is
938 /// data construction only and does not execute the configured behavior.
939 pub fn source(mut self, source: SourceRef) -> Self {
940 self.rule.source = source;
941 self
942 }
943
944 /// Returns an updated value with matcher configured.
945 /// This is data-only and does not perform I/O, call host ports, append journals, publish
946 /// events, or start processes.
947 pub fn matcher(mut self, matcher: StreamMatcher) -> Self {
948 self.rule.matcher = matcher;
949 self
950 }
951
952 /// Returns an updated value with on configured.
953 /// This is data-only and does not perform I/O, call host ports, append journals, publish
954 /// events, or start processes.
955 pub fn on(mut self, channel: StreamChannel) -> Self {
956 self.rule
957 .channels
958 .push(StreamChannelSelector::channel(channel));
959 self
960 }
961
962 /// Returns an updated records::stream value with action applied. This is
963 /// data construction only and does not execute the configured behavior.
964 pub fn action(mut self, action: StreamAction) -> Self {
965 self.rule.action = action;
966 self
967 }
968
969 /// Returns an updated value with repeat configured.
970 /// This is data-only and does not perform I/O, call host ports, append journals, publish
971 /// events, or start processes.
972 pub fn repeat(mut self, repeat: RepeatPolicy) -> Self {
973 self.rule.repeat = repeat;
974 self
975 }
976
977 /// Returns policy for the current value.
978 /// This is a read-only or data-construction helper unless the method body explicitly calls
979 /// a port or store.
980 pub fn policy(mut self, policy_ref: PolicyRef) -> Self {
981 self.rule.policy_refs.push(policy_ref);
982 self
983 }
984
985 /// Finishes builder validation and returns the configured value.
986 /// This is data-only unless the surrounding builder explicitly
987 /// documents adapter or store access.
988 pub fn build(self) -> Result<StreamRule, AgentError> {
989 self.rule.validate()?;
990 Ok(self.rule)
991 }
992}
993
994#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
995/// Carries the redacted match record payload for journal, event, or fixture surfaces.
996/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
997pub struct RedactedMatch {
998 /// Stable match id used for typed lineage, lookup, or dedupe.
999 pub match_id: StreamMatchId,
1000 /// Channel used by this record or request.
1001 pub channel: StreamChannel,
1002 #[serde(skip_serializing_if = "Option::is_none")]
1003 /// Optional direction value.
1004 /// When absent, callers should use the documented default or skip that optional behavior.
1005 pub direction: Option<StreamDirection>,
1006 /// Cursor identifying a replay, export, or subscription position.
1007 /// Use it to resume without widening the original scope.
1008 pub cursor: StreamCursor,
1009 /// Observed byte length for the source, sidecar, or extracted record.
1010 pub byte_len: usize,
1011 /// Deterministic text hash used for stale checks, package evidence, or
1012 /// replay comparisons.
1013 pub text_hash: String,
1014 /// Redacted human-readable summary safe for events, telemetry, and logs.
1015 pub redacted_summary: String,
1016 /// Privacy class used for projection, telemetry, and raw-content access
1017 /// decisions.
1018 pub privacy: PrivacyClass,
1019 /// Retention class used by hosts and sinks when storing or exporting this
1020 /// item.
1021 pub retention: RetentionClass,
1022}
1023
1024impl RedactedMatch {
1025 /// Constructs this value from text. Use it when adapting canonical
1026 /// SDK records without introducing a second behavior path.
1027 pub fn from_text(rule: &StreamRule, delta: &StreamDelta, matched_text: &str) -> Self {
1028 let hash = format!("sha256:{:x}", Sha256::digest(matched_text.as_bytes()));
1029 Self {
1030 match_id: StreamMatchId::new(format!(
1031 "stream.match.{}.{}",
1032 safe_id_fragment(rule.id.as_str()),
1033 &hash["sha256:".len()..("sha256:".len() + 12)]
1034 )),
1035 channel: delta.channel.clone(),
1036 direction: delta.direction.clone(),
1037 cursor: delta.cursor.clone(),
1038 byte_len: matched_text.len(),
1039 text_hash: hash,
1040 redacted_summary: format!(
1041 "stream match redacted: {} bytes on {}",
1042 matched_text.len(),
1043 delta.channel.as_contract_name()
1044 ),
1045 privacy: PrivacyClass::ContentRefsOnly,
1046 retention: RetentionClass::RunScoped,
1047 }
1048 }
1049}
1050
1051#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1052/// Carries the stream match ref record payload for journal, event, or fixture surfaces.
1053/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1054pub struct StreamMatchRef {
1055 /// Stable match id used for typed lineage, lookup, or dedupe.
1056 pub match_id: StreamMatchId,
1057 /// Stable rule id used for typed lineage, lookup, or dedupe.
1058 pub rule_id: StreamRuleId,
1059 /// Version string for this capability, package, or protocol surface.
1060 /// Use it for compatibility checks during package or adapter resolution.
1061 pub rule_version: RuleVersion,
1062}
1063
1064#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1065/// Carries the stream intervention record payload for journal, event, or fixture surfaces.
1066/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1067pub struct StreamIntervention {
1068 /// Stable intervention id used for typed lineage, lookup, or dedupe.
1069 pub intervention_id: StreamInterventionId,
1070 /// Typed rule ref reference. Resolving or executing it is a separate
1071 /// policy-gated step.
1072 pub rule_ref: EntityRef,
1073 /// Requested action used by this record or request.
1074 pub requested_action: StreamAction,
1075 #[serde(skip_serializing_if = "Option::is_none")]
1076 /// Optional applied action value.
1077 /// When absent, callers should use the documented default or skip that optional behavior.
1078 pub applied_action: Option<StreamAction>,
1079 /// Typed match ref reference. Resolving or executing it is a separate
1080 /// policy-gated step.
1081 pub match_ref: StreamMatchRef,
1082 /// Redacted match used by this record or request.
1083 pub redacted_match: RedactedMatch,
1084 /// Partial output policy used by this record or request.
1085 pub partial_output_policy: PartialOutputPolicy,
1086 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1087 /// Policy references that govern admission, projection, execution, or
1088 /// delivery.
1089 pub policy_refs: Vec<PolicyRef>,
1090 #[serde(skip_serializing_if = "Option::is_none")]
1091 /// Typed effect intent ref reference. Resolving or executing it is a
1092 /// separate policy-gated step.
1093 pub effect_intent_ref: Option<EffectId>,
1094 #[serde(skip_serializing_if = "Option::is_none")]
1095 /// Typed effect result ref reference. Resolving or executing it is a
1096 /// separate policy-gated step.
1097 pub effect_result_ref: Option<EffectId>,
1098 #[serde(skip_serializing_if = "Option::is_none")]
1099 /// Optional effect intent value.
1100 /// When absent, callers should use the documented default or skip that optional behavior.
1101 pub effect_intent: Option<EffectIntent>,
1102 #[serde(skip_serializing_if = "Option::is_none")]
1103 /// Optional effect result value.
1104 /// When absent, callers should use the documented default or skip that optional behavior.
1105 pub effect_result: Option<EffectResult>,
1106}
1107
1108impl StreamIntervention {
1109 /// Builds the proposed value.
1110 /// This is data construction and performs no I/O, journal append, event publication, or
1111 /// process work.
1112 pub fn proposed(rule: &StreamRule, redacted_match: RedactedMatch) -> Self {
1113 let match_ref = StreamMatchRef {
1114 match_id: redacted_match.match_id.clone(),
1115 rule_id: rule.id.clone(),
1116 rule_version: rule.version,
1117 };
1118 Self {
1119 intervention_id: StreamInterventionId::new(format!(
1120 "stream.intervention.{}.{}",
1121 safe_id_fragment(rule.id.as_str()),
1122 safe_id_fragment(redacted_match.match_id.as_str())
1123 )),
1124 rule_ref: EntityRef::new(EntityKind::StreamRule, rule.id.as_str()),
1125 requested_action: rule.action.clone(),
1126 applied_action: None,
1127 match_ref,
1128 redacted_match,
1129 partial_output_policy: rule.action.partial_output_policy(),
1130 policy_refs: rule.policy_refs.clone(),
1131 effect_intent_ref: None,
1132 effect_result_ref: None,
1133 effect_intent: None,
1134 effect_result: None,
1135 }
1136 }
1137
1138 /// Returns this value with its effect intent setting replaced. The
1139 /// method follows builder-style data construction and does not
1140 /// execute external work.
1141 pub fn with_effect_intent(mut self, effect_id: EffectId) -> Self {
1142 if let Some(kind) = self.requested_action.effect_kind_hint() {
1143 let mut intent = EffectIntent::new(
1144 effect_id.clone(),
1145 kind,
1146 self.rule_ref.clone(),
1147 SourceRef::with_kind(SourceKind::Sdk, "source.sdk.stream_rule"),
1148 format!(
1149 "stream intervention {} requested for redacted match",
1150 self.requested_action.action_kind()
1151 ),
1152 );
1153 intent.policy_refs = self.policy_refs.clone();
1154 self.effect_intent_ref = Some(effect_id);
1155 self.effect_intent = Some(intent);
1156 }
1157 self
1158 }
1159
1160 /// Returns this value with its effect result setting replaced. The
1161 /// method follows builder-style data construction and does not
1162 /// execute external work.
1163 pub fn with_effect_result(mut self, result: EffectResult) -> Self {
1164 self.effect_result_ref = Some(result.effect_id.clone());
1165 self.effect_result = Some(result);
1166 self
1167 }
1168
1169 /// Returns the effect kind name currently held by this value.
1170 /// This is data-only and does not perform I/O, call host ports, append journals, publish
1171 /// events, or start processes.
1172 pub fn effect_kind_name(&self) -> Option<&'static str> {
1173 self.effect_intent.as_ref().map(|intent| match intent.kind {
1174 EffectKind::ProviderRequest => "provider_request",
1175 EffectKind::ToolExecution => "tool_execution",
1176 EffectKind::ApprovalDispatch => "approval_dispatch",
1177 EffectKind::MemoryWrite => "memory_write",
1178 EffectKind::ExtensionAction => "extension_action",
1179 EffectKind::OutputDelivery => "output_delivery",
1180 EffectKind::FileWrite => "file_write",
1181 EffectKind::ProcessStart => "process_start",
1182 EffectKind::ProcessSignal => "process_signal",
1183 EffectKind::IsolatedProcessStart => "isolated_process_start",
1184 EffectKind::ChildAgentStart => "child_agent_start",
1185 EffectKind::RunMessageDelivery => "run_message_delivery",
1186 EffectKind::ChildArtifactShutdown => "child_artifact_shutdown",
1187 EffectKind::DetachTransfer => "detach_transfer",
1188 EffectKind::HookMutation => "hook_mutation",
1189 })
1190 }
1191}
1192
1193#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1194#[serde(rename_all = "snake_case")]
1195/// Enumerates the finite stream rule record kind cases.
1196/// Serialized names are part of the SDK contract; update fixtures when variants change.
1197pub enum StreamRuleRecordKind {
1198 /// Use this variant when the contract needs to represent registered; selecting it has no side effect by itself.
1199 Registered,
1200 /// Use this variant when the contract needs to represent compile failed; selecting it has no side effect by itself.
1201 CompileFailed,
1202 /// Use this variant when the contract needs to represent matched; selecting it has no side effect by itself.
1203 Matched,
1204 /// Use this variant when the contract needs to represent intervention intent; selecting it has no side effect by itself.
1205 InterventionIntent,
1206 /// Use this variant when the contract needs to represent intervention result; selecting it has no side effect by itself.
1207 InterventionResult,
1208 /// Use this variant when the contract needs to represent repeat state; selecting it has no side effect by itself.
1209 RepeatState,
1210}
1211
1212#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1213/// Carries the stream rule repeat state snapshot record payload for journal, event, or fixture surfaces.
1214/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1215pub struct StreamRuleRepeatStateSnapshot {
1216 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1217 /// Collection of seen match keys values.
1218 /// Ordering and membership should be treated as part of the serialized contract when
1219 /// relevant.
1220 pub seen_match_keys: Vec<String>,
1221}
1222
1223#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
1224/// Carries the stream rule record record payload for journal, event, or fixture surfaces.
1225/// Creating or cloning it only preserves serialized SDK state; append, publish, replay, or export effects are documented on the runtime and port methods that store it.
1226pub struct StreamRuleRecord {
1227 /// Kind discriminator for record kind.
1228 /// Use it to route finite match arms without parsing display text.
1229 pub record_kind: StreamRuleRecordKind,
1230 /// Stable rule id used for typed lineage, lookup, or dedupe.
1231 pub rule_id: StreamRuleId,
1232 /// Version string for this capability, package, or protocol surface.
1233 /// Use it for compatibility checks during package or adapter resolution.
1234 pub rule_version: RuleVersion,
1235 #[serde(skip_serializing_if = "Option::is_none")]
1236 /// Optional channel value.
1237 /// When absent, callers should use the documented default or skip that optional behavior.
1238 pub channel: Option<StreamChannel>,
1239 #[serde(skip_serializing_if = "Option::is_none")]
1240 /// Optional direction value.
1241 /// When absent, callers should use the documented default or skip that optional behavior.
1242 pub direction: Option<StreamDirection>,
1243 #[serde(skip_serializing_if = "Option::is_none")]
1244 /// Cursor identifying a replay, export, or subscription position.
1245 /// Use it to resume without widening the original scope.
1246 pub cursor: Option<StreamCursor>,
1247 #[serde(skip_serializing_if = "Option::is_none")]
1248 /// Optional redacted match value.
1249 /// When absent, callers should use the documented default or skip that optional behavior.
1250 pub redacted_match: Option<RedactedMatch>,
1251 #[serde(skip_serializing_if = "Option::is_none")]
1252 /// Optional intervention value.
1253 /// When absent, callers should use the documented default or skip that optional behavior.
1254 pub intervention: Option<StreamIntervention>,
1255 #[serde(skip_serializing_if = "Option::is_none")]
1256 /// Optional repeat state value.
1257 /// When absent, callers should use the documented default or skip that optional behavior.
1258 pub repeat_state: Option<StreamRuleRepeatStateSnapshot>,
1259 #[serde(default, skip_serializing_if = "Vec::is_empty")]
1260 /// Policy references that govern admission, projection, execution, or
1261 /// delivery.
1262 pub policy_refs: Vec<PolicyRef>,
1263 /// Redacted human-readable summary safe for events, telemetry, and logs.
1264 pub redacted_summary: String,
1265}
1266
1267impl StreamRuleRecord {
1268 /// Builds the matched value.
1269 /// This is data construction and performs no I/O, journal append, event publication, or
1270 /// process work.
1271 pub fn matched(rule: &StreamRule, intervention: &StreamIntervention) -> Self {
1272 Self {
1273 record_kind: StreamRuleRecordKind::Matched,
1274 rule_id: rule.id.clone(),
1275 rule_version: rule.version,
1276 channel: Some(intervention.redacted_match.channel.clone()),
1277 direction: intervention.redacted_match.direction.clone(),
1278 cursor: Some(intervention.redacted_match.cursor.clone()),
1279 redacted_match: Some(intervention.redacted_match.clone()),
1280 intervention: Some(intervention.clone()),
1281 repeat_state: None,
1282 policy_refs: rule.policy_refs.clone(),
1283 redacted_summary: "stream rule matched redacted content".to_string(),
1284 }
1285 }
1286
1287 /// Builds the repeat state value.
1288 /// This is data construction and performs no I/O, journal append, event publication, or
1289 /// process work.
1290 pub fn repeat_state(rule: &StreamRule, repeat_state: StreamRuleRepeatStateSnapshot) -> Self {
1291 Self {
1292 record_kind: StreamRuleRecordKind::RepeatState,
1293 rule_id: rule.id.clone(),
1294 rule_version: rule.version,
1295 channel: None,
1296 direction: None,
1297 cursor: None,
1298 redacted_match: None,
1299 intervention: None,
1300 repeat_state: Some(repeat_state),
1301 policy_refs: rule.policy_refs.clone(),
1302 redacted_summary: "stream rule repeat state snapshot".to_string(),
1303 }
1304 }
1305
1306 /// Converts this value into journal record data.
1307 /// This is data-only and does not perform I/O, call host ports, append journals, publish
1308 /// events, or start processes.
1309 pub fn to_journal_record(&self, base: JournalRecordBase) -> JournalRecord {
1310 JournalRecord::feature_record(
1311 base,
1312 JournalRecordKind::StreamRule,
1313 "stream_rule",
1314 self.event_kind_name(),
1315 EntityRef::new(EntityKind::StreamRule, self.rule_id.as_str()),
1316 Vec::new(),
1317 Vec::new(),
1318 JournalRecordPayload::StreamRule(self.clone()),
1319 )
1320 }
1321
1322 /// Returns the event kind name currently held by this value.
1323 /// This is data-only and does not perform I/O, call host ports, append journals, publish
1324 /// events, or start processes.
1325 pub fn event_kind_name(&self) -> &'static str {
1326 match self.record_kind {
1327 StreamRuleRecordKind::Registered => "stream_rule_registered",
1328 StreamRuleRecordKind::CompileFailed => "stream_rule_compile_failed",
1329 StreamRuleRecordKind::Matched => "stream_rule_matched",
1330 StreamRuleRecordKind::InterventionIntent => "stream_intervention_requested",
1331 StreamRuleRecordKind::InterventionResult => "stream_intervention_applied",
1332 StreamRuleRecordKind::RepeatState => "stream_rule_repeat_state_recorded",
1333 }
1334 }
1335}
1336
1337/// Validates the records::stream invariants and returns a typed error
1338/// on failure. Validation is pure and does not perform I/O, dispatch,
1339/// journal appends, or adapter calls.
1340pub(crate) fn validate_safe_regex(pattern: &str) -> Result<(), AgentError> {
1341 if pattern.is_empty() {
1342 return Err(AgentError::missing_required_field(
1343 "stream_matcher.regex.pattern",
1344 ));
1345 }
1346 if pattern.len() > 512 {
1347 return Err(AgentError::contract_violation(
1348 "stream regex pattern exceeds bounded length",
1349 ));
1350 }
1351 let forbidden = ["(?", "\\1", "\\2", "+)+", "*)+", "++", "**", "{0,"];
1352 if forbidden.iter().any(|needle| pattern.contains(needle)) {
1353 return Err(AgentError::contract_violation(
1354 "stream regex pattern uses unsupported or backtracking-prone syntax",
1355 ));
1356 }
1357 Ok(())
1358}
1359
1360fn validate_window(window_bytes: u64) -> Result<(), AgentError> {
1361 if window_bytes == 0 || window_bytes > 64 * 1024 {
1362 return Err(AgentError::contract_violation(
1363 "stream matcher window_bytes must be between 1 and 65536",
1364 ));
1365 }
1366 Ok(())
1367}
1368
1369/// Computes the stable hash rule fingerprint for this records::stream
1370/// value. The computation is deterministic and side-effect free so it
1371/// can be used in package, journal, or test evidence.
1372pub(crate) fn hash_rule_fingerprint(rule: &StreamRule) -> Result<String, AgentError> {
1373 let bytes = serde_json::to_vec(rule)
1374 .map_err(|error| AgentError::contract_violation(error.to_string()))?;
1375 Ok(format!("sha256:{:x}", Sha256::digest(bytes)))
1376}
1377
1378/// Builds the safe id fragment value.
1379/// This is data construction and performs no I/O, journal append, event publication, or process
1380pub(crate) fn safe_id_fragment(value: &str) -> String {
1381 value
1382 .chars()
1383 .map(|character| {
1384 if character.is_ascii_alphanumeric() || character == '.' || character == '_' {
1385 character
1386 } else {
1387 '.'
1388 }
1389 })
1390 .collect()
1391}
1392
1393/// Returns stream policy ref derived from the supplied state.
1394/// This is data-only and does not perform I/O, call host ports, append journals, publish
1395/// events, or start processes.
1396pub fn stream_policy_ref(id: impl Into<String>) -> PolicyRef {
1397 PolicyRef::with_kind(PolicyKind::RuntimePackage, id)
1398}