sentinel_agent_protocol/v2/
streaming.rs

1//! Bidirectional streaming types for Protocol v2.
2
3use serde::{Deserialize, Serialize};
4use crate::{AuditMetadata, Decision, HeaderOp};
5
6/// Flow control signal for backpressure.
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct FlowControlSignal {
9    pub correlation_id: Option<String>,
10    pub action: FlowAction,
11    pub timestamp_ms: u64,
12}
13
14impl FlowControlSignal {
15    pub fn pause_all() -> Self {
16        Self { correlation_id: None, action: FlowAction::Pause, timestamp_ms: now_ms() }
17    }
18
19    pub fn resume_all() -> Self {
20        Self { correlation_id: None, action: FlowAction::Resume, timestamp_ms: now_ms() }
21    }
22
23    pub fn is_global(&self) -> bool { self.correlation_id.is_none() }
24}
25
26/// Flow control action.
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "snake_case", tag = "type")]
29pub enum FlowAction {
30    Pause,
31    Resume,
32    UpdateCapacity { buffer_available: usize },
33}
34
35/// Body chunk event with flow control support.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct BodyChunkEventV2 {
38    pub correlation_id: String,
39    pub chunk_index: u32,
40    pub data: String,
41    pub is_last: bool,
42    pub total_size: Option<usize>,
43    pub bytes_transferred: usize,
44    pub proxy_buffer_available: usize,
45    pub timestamp_ms: u64,
46}
47
48/// Agent response to a processing event.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct AgentResponse {
51    pub correlation_id: String,
52    pub decision: Decision,
53    #[serde(default)]
54    pub request_headers: Vec<HeaderOp>,
55    #[serde(default)]
56    pub response_headers: Vec<HeaderOp>,
57    #[serde(default)]
58    pub audit: AuditMetadata,
59    pub processing_time_ms: Option<u64>,
60    pub needs_more: bool,
61}
62
63impl AgentResponse {
64    pub fn allow(correlation_id: impl Into<String>) -> Self {
65        Self {
66            correlation_id: correlation_id.into(),
67            decision: Decision::Allow,
68            request_headers: Vec::new(),
69            response_headers: Vec::new(),
70            audit: AuditMetadata::default(),
71            processing_time_ms: None,
72            needs_more: false,
73        }
74    }
75
76    pub fn block(correlation_id: impl Into<String>, status: u16) -> Self {
77        Self {
78            correlation_id: correlation_id.into(),
79            decision: Decision::Block { status, body: None, headers: None },
80            request_headers: Vec::new(),
81            response_headers: Vec::new(),
82            audit: AuditMetadata::default(),
83            processing_time_ms: None,
84            needs_more: false,
85        }
86    }
87
88    pub fn with_request_header(mut self, op: HeaderOp) -> Self {
89        self.request_headers.push(op);
90        self
91    }
92
93    pub fn with_processing_time(mut self, ms: u64) -> Self {
94        self.processing_time_ms = Some(ms);
95        self
96    }
97
98    pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
99        self.audit = audit;
100        self
101    }
102}
103
104/// Stream state tracking.
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
106pub enum StreamState {
107    #[default]
108    Disconnected,
109    Handshaking,
110    Active,
111    Paused,
112    Draining,
113    Closed,
114}
115
116impl StreamState {
117    pub fn can_accept_requests(&self) -> bool { matches!(self, StreamState::Active) }
118    pub fn is_connected(&self) -> bool { !matches!(self, StreamState::Disconnected | StreamState::Closed) }
119}
120
121fn now_ms() -> u64 {
122    std::time::SystemTime::now()
123        .duration_since(std::time::UNIX_EPOCH)
124        .map(|d| d.as_millis() as u64)
125        .unwrap_or(0)
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn test_flow_control_signal() {
134        let pause = FlowControlSignal::pause_all();
135        assert!(pause.is_global());
136        assert_eq!(pause.action, FlowAction::Pause);
137    }
138
139    #[test]
140    fn test_agent_response() {
141        let response = AgentResponse::allow("req-123").with_processing_time(5);
142        assert!(matches!(response.decision, Decision::Allow));
143        assert_eq!(response.processing_time_ms, Some(5));
144    }
145
146    #[test]
147    fn test_stream_state() {
148        assert!(!StreamState::Disconnected.can_accept_requests());
149        assert!(StreamState::Active.can_accept_requests());
150        assert!(StreamState::Active.is_connected());
151        assert!(!StreamState::Closed.is_connected());
152    }
153}