Skip to main content

sentinel_agent_protocol/v2/
streaming.rs

1//! Bidirectional streaming types for Protocol v2.
2
3use crate::{AuditMetadata, Decision, HeaderOp};
4use serde::{Deserialize, Serialize};
5
6/// Flow control signal for backpressure.
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct FlowControlSignal {
9    pub correlation_id: Option<String>,
10    pub action: FlowAction,
11    pub timestamp_ms: u64,
12}
13
14impl FlowControlSignal {
15    pub fn pause_all() -> Self {
16        Self {
17            correlation_id: None,
18            action: FlowAction::Pause,
19            timestamp_ms: now_ms(),
20        }
21    }
22
23    pub fn resume_all() -> Self {
24        Self {
25            correlation_id: None,
26            action: FlowAction::Resume,
27            timestamp_ms: now_ms(),
28        }
29    }
30
31    pub fn is_global(&self) -> bool {
32        self.correlation_id.is_none()
33    }
34}
35
36/// Flow control action.
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38#[serde(rename_all = "snake_case", tag = "type")]
39pub enum FlowAction {
40    Pause,
41    Resume,
42    UpdateCapacity { buffer_available: usize },
43}
44
45/// Body chunk event with flow control support.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct BodyChunkEventV2 {
48    pub correlation_id: String,
49    pub chunk_index: u32,
50    pub data: String,
51    pub is_last: bool,
52    pub total_size: Option<usize>,
53    pub bytes_transferred: usize,
54    pub proxy_buffer_available: usize,
55    pub timestamp_ms: u64,
56}
57
58/// Agent response to a processing event.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct AgentResponse {
61    pub correlation_id: String,
62    pub decision: Decision,
63    #[serde(default)]
64    pub request_headers: Vec<HeaderOp>,
65    #[serde(default)]
66    pub response_headers: Vec<HeaderOp>,
67    #[serde(default)]
68    pub audit: AuditMetadata,
69    pub processing_time_ms: Option<u64>,
70    pub needs_more: bool,
71}
72
73impl AgentResponse {
74    pub fn allow(correlation_id: impl Into<String>) -> Self {
75        Self {
76            correlation_id: correlation_id.into(),
77            decision: Decision::Allow,
78            request_headers: Vec::new(),
79            response_headers: Vec::new(),
80            audit: AuditMetadata::default(),
81            processing_time_ms: None,
82            needs_more: false,
83        }
84    }
85
86    pub fn block(correlation_id: impl Into<String>, status: u16) -> Self {
87        Self {
88            correlation_id: correlation_id.into(),
89            decision: Decision::Block {
90                status,
91                body: None,
92                headers: None,
93            },
94            request_headers: Vec::new(),
95            response_headers: Vec::new(),
96            audit: AuditMetadata::default(),
97            processing_time_ms: None,
98            needs_more: false,
99        }
100    }
101
102    pub fn with_request_header(mut self, op: HeaderOp) -> Self {
103        self.request_headers.push(op);
104        self
105    }
106
107    pub fn with_processing_time(mut self, ms: u64) -> Self {
108        self.processing_time_ms = Some(ms);
109        self
110    }
111
112    pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
113        self.audit = audit;
114        self
115    }
116}
117
118/// Stream state tracking.
119#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
120pub enum StreamState {
121    #[default]
122    Disconnected,
123    Handshaking,
124    Active,
125    Paused,
126    Draining,
127    Closed,
128}
129
130impl StreamState {
131    pub fn can_accept_requests(&self) -> bool {
132        matches!(self, StreamState::Active)
133    }
134    pub fn is_connected(&self) -> bool {
135        !matches!(self, StreamState::Disconnected | StreamState::Closed)
136    }
137}
138
139fn now_ms() -> u64 {
140    std::time::SystemTime::now()
141        .duration_since(std::time::UNIX_EPOCH)
142        .map(|d| d.as_millis() as u64)
143        .unwrap_or(0)
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_flow_control_signal() {
152        let pause = FlowControlSignal::pause_all();
153        assert!(pause.is_global());
154        assert_eq!(pause.action, FlowAction::Pause);
155    }
156
157    #[test]
158    fn test_agent_response() {
159        let response = AgentResponse::allow("req-123").with_processing_time(5);
160        assert!(matches!(response.decision, Decision::Allow));
161        assert_eq!(response.processing_time_ms, Some(5));
162    }
163
164    #[test]
165    fn test_stream_state() {
166        assert!(!StreamState::Disconnected.can_accept_requests());
167        assert!(StreamState::Active.can_accept_requests());
168        assert!(StreamState::Active.is_connected());
169        assert!(!StreamState::Closed.is_connected());
170    }
171}