grapsus_agent_protocol/v2/
streaming.rs1use crate::{AuditMetadata, Decision, HeaderOp};
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct FlowControlSignal {
9 pub correlation_id: Option<String>,
10 pub action: FlowAction,
11 pub timestamp_ms: u64,
12}
13
14impl FlowControlSignal {
15 pub fn pause_all() -> Self {
16 Self {
17 correlation_id: None,
18 action: FlowAction::Pause,
19 timestamp_ms: now_ms(),
20 }
21 }
22
23 pub fn resume_all() -> Self {
24 Self {
25 correlation_id: None,
26 action: FlowAction::Resume,
27 timestamp_ms: now_ms(),
28 }
29 }
30
31 pub fn is_global(&self) -> bool {
32 self.correlation_id.is_none()
33 }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38#[serde(rename_all = "snake_case", tag = "type")]
39pub enum FlowAction {
40 Pause,
41 Resume,
42 UpdateCapacity { buffer_available: usize },
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct BodyChunkEventV2 {
48 pub correlation_id: String,
49 pub chunk_index: u32,
50 pub data: String,
51 pub is_last: bool,
52 pub total_size: Option<usize>,
53 pub bytes_transferred: usize,
54 pub proxy_buffer_available: usize,
55 pub timestamp_ms: u64,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct AgentResponse {
61 pub correlation_id: String,
62 pub decision: Decision,
63 #[serde(default)]
64 pub request_headers: Vec<HeaderOp>,
65 #[serde(default)]
66 pub response_headers: Vec<HeaderOp>,
67 #[serde(default)]
68 pub audit: AuditMetadata,
69 pub processing_time_ms: Option<u64>,
70 pub needs_more: bool,
71}
72
73impl AgentResponse {
74 pub fn allow(correlation_id: impl Into<String>) -> Self {
75 Self {
76 correlation_id: correlation_id.into(),
77 decision: Decision::Allow,
78 request_headers: Vec::new(),
79 response_headers: Vec::new(),
80 audit: AuditMetadata::default(),
81 processing_time_ms: None,
82 needs_more: false,
83 }
84 }
85
86 pub fn block(correlation_id: impl Into<String>, status: u16) -> Self {
87 Self {
88 correlation_id: correlation_id.into(),
89 decision: Decision::Block {
90 status,
91 body: None,
92 headers: None,
93 },
94 request_headers: Vec::new(),
95 response_headers: Vec::new(),
96 audit: AuditMetadata::default(),
97 processing_time_ms: None,
98 needs_more: false,
99 }
100 }
101
102 pub fn with_request_header(mut self, op: HeaderOp) -> Self {
103 self.request_headers.push(op);
104 self
105 }
106
107 pub fn with_processing_time(mut self, ms: u64) -> Self {
108 self.processing_time_ms = Some(ms);
109 self
110 }
111
112 pub fn with_audit(mut self, audit: AuditMetadata) -> Self {
113 self.audit = audit;
114 self
115 }
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
120pub enum StreamState {
121 #[default]
122 Disconnected,
123 Handshaking,
124 Active,
125 Paused,
126 Draining,
127 Closed,
128}
129
130impl StreamState {
131 pub fn can_accept_requests(&self) -> bool {
132 matches!(self, StreamState::Active)
133 }
134 pub fn is_connected(&self) -> bool {
135 !matches!(self, StreamState::Disconnected | StreamState::Closed)
136 }
137}
138
139fn now_ms() -> u64 {
140 std::time::SystemTime::now()
141 .duration_since(std::time::UNIX_EPOCH)
142 .map(|d| d.as_millis() as u64)
143 .unwrap_or(0)
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn test_flow_control_signal() {
152 let pause = FlowControlSignal::pause_all();
153 assert!(pause.is_global());
154 assert_eq!(pause.action, FlowAction::Pause);
155 }
156
157 #[test]
158 fn test_agent_response() {
159 let response = AgentResponse::allow("req-123").with_processing_time(5);
160 assert!(matches!(response.decision, Decision::Allow));
161 assert_eq!(response.processing_time_ms, Some(5));
162 }
163
164 #[test]
165 fn test_stream_state() {
166 assert!(!StreamState::Disconnected.can_accept_requests());
167 assert!(StreamState::Active.can_accept_requests());
168 assert!(StreamState::Active.is_connected());
169 assert!(!StreamState::Closed.is_connected());
170 }
171}