sentinel_agent_protocol/v2/
streaming.rs1use serde::{Deserialize, Serialize};
4use crate::{AuditMetadata, Decision, HeaderOp};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct FlowControlSignal {
9 pub correlation_id: Option<String>,
10 pub action: FlowAction,
11 pub timestamp_ms: u64,
12}
13
14impl FlowControlSignal {
15 pub fn pause_all() -> Self {
16 Self { correlation_id: None, action: FlowAction::Pause, timestamp_ms: now_ms() }
17 }
18
19 pub fn resume_all() -> Self {
20 Self { correlation_id: None, action: FlowAction::Resume, timestamp_ms: now_ms() }
21 }
22
23 pub fn is_global(&self) -> bool { self.correlation_id.is_none() }
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "snake_case", tag = "type")]
29pub enum FlowAction {
30 Pause,
31 Resume,
32 UpdateCapacity { buffer_available: usize },
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct BodyChunkEventV2 {
38 pub correlation_id: String,
39 pub chunk_index: u32,
40 pub data: String,
41 pub is_last: bool,
42 pub total_size: Option<usize>,
43 pub bytes_transferred: usize,
44 pub proxy_buffer_available: usize,
45 pub timestamp_ms: u64,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct AgentResponse {
51 pub correlation_id: String,
52 pub decision: Decision,
53 #[serde(default)]
54 pub request_headers: Vec<HeaderOp>,
55 #[serde(default)]
56 pub response_headers: Vec<HeaderOp>,
57 #[serde(default)]
58 pub audit: AuditMetadata,
59 pub processing_time_ms: Option<u64>,
60 pub needs_more: bool,
61}
62
63impl AgentResponse {
64 pub fn allow(correlation_id: impl Into<String>) -> Self {
65 Self {
66 correlation_id: correlation_id.into(),
67 decision: Decision::Allow,
68 request_headers: Vec::new(),
69 response_headers: Vec::new(),
70 audit: AuditMetadata::default(),
71 processing_time_ms: None,
72 needs_more: false,
73 }
74 }
75
76 pub fn block(correlation_id: impl Into<String>, status: u16) -> Self {
77 Self {
78 correlation_id: correlation_id.into(),
79 decision: Decision::Block { status, body: None, headers: None },
80 request_headers: Vec::new(),
81 response_headers: Vec::new(),
82 audit: AuditMetadata::default(),
83 processing_time_ms: None,
84 needs_more: false,
85 }
86 }
87
88 pub fn with_request_header(mut self, op: HeaderOp) -> Self {
89 self.request_headers.push(op);
90 self
91 }
92
93 pub fn with_processing_time(mut self, ms: u64) -> Self {
94 self.processing_time_ms = Some(ms);
95 self
96 }
97
98 pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
99 self.audit = audit;
100 self
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
106pub enum StreamState {
107 #[default]
108 Disconnected,
109 Handshaking,
110 Active,
111 Paused,
112 Draining,
113 Closed,
114}
115
116impl StreamState {
117 pub fn can_accept_requests(&self) -> bool { matches!(self, StreamState::Active) }
118 pub fn is_connected(&self) -> bool { !matches!(self, StreamState::Disconnected | StreamState::Closed) }
119}
120
121fn now_ms() -> u64 {
122 std::time::SystemTime::now()
123 .duration_since(std::time::UNIX_EPOCH)
124 .map(|d| d.as_millis() as u64)
125 .unwrap_or(0)
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn test_flow_control_signal() {
134 let pause = FlowControlSignal::pause_all();
135 assert!(pause.is_global());
136 assert_eq!(pause.action, FlowAction::Pause);
137 }
138
139 #[test]
140 fn test_agent_response() {
141 let response = AgentResponse::allow("req-123").with_processing_time(5);
142 assert!(matches!(response.decision, Decision::Allow));
143 assert_eq!(response.processing_time_ms, Some(5));
144 }
145
146 #[test]
147 fn test_stream_state() {
148 assert!(!StreamState::Disconnected.can_accept_requests());
149 assert!(StreamState::Active.can_accept_requests());
150 assert!(StreamState::Active.is_connected());
151 assert!(!StreamState::Closed.is_connected());
152 }
153}