sentinel_agent_protocol/
client.rs1use serde::Serialize;
4use std::time::Duration;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::UnixStream;
7
8use crate::errors::AgentProtocolError;
9use crate::protocol::{AgentRequest, AgentResponse, EventType, MAX_MESSAGE_SIZE, PROTOCOL_VERSION};
10
11pub struct AgentClient {
13 id: String,
15 connection: AgentConnection,
17 timeout: Duration,
19 #[allow(dead_code)]
21 max_retries: u32,
22}
23
24enum AgentConnection {
26 UnixSocket(UnixStream),
27 #[allow(dead_code)]
28 Grpc(tonic::transport::Channel),
29}
30
31impl AgentClient {
32 pub async fn unix_socket(
34 id: impl Into<String>,
35 path: impl AsRef<std::path::Path>,
36 timeout: Duration,
37 ) -> Result<Self, AgentProtocolError> {
38 let stream = UnixStream::connect(path.as_ref())
39 .await
40 .map_err(|e| AgentProtocolError::ConnectionFailed(e.to_string()))?;
41
42 Ok(Self {
43 id: id.into(),
44 connection: AgentConnection::UnixSocket(stream),
45 timeout,
46 max_retries: 3,
47 })
48 }
49
50 #[allow(dead_code)]
52 pub fn id(&self) -> &str {
53 &self.id
54 }
55
56 pub async fn send_event(
58 &mut self,
59 event_type: EventType,
60 payload: impl Serialize,
61 ) -> Result<AgentResponse, AgentProtocolError> {
62 let request = AgentRequest {
63 version: PROTOCOL_VERSION,
64 event_type,
65 payload: serde_json::to_value(payload)
66 .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?,
67 };
68
69 let request_bytes = serde_json::to_vec(&request)
71 .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
72
73 if request_bytes.len() > MAX_MESSAGE_SIZE {
75 return Err(AgentProtocolError::MessageTooLarge {
76 size: request_bytes.len(),
77 max: MAX_MESSAGE_SIZE,
78 });
79 }
80
81 let response = tokio::time::timeout(self.timeout, async {
83 self.send_raw(&request_bytes).await?;
84 self.receive_raw().await
85 })
86 .await
87 .map_err(|_| AgentProtocolError::Timeout(self.timeout))??;
88
89 let agent_response: AgentResponse = serde_json::from_slice(&response)
91 .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
92
93 if agent_response.version != PROTOCOL_VERSION {
95 return Err(AgentProtocolError::VersionMismatch {
96 expected: PROTOCOL_VERSION,
97 actual: agent_response.version,
98 });
99 }
100
101 Ok(agent_response)
102 }
103
104 async fn send_raw(&mut self, data: &[u8]) -> Result<(), AgentProtocolError> {
106 match &mut self.connection {
107 AgentConnection::UnixSocket(stream) => {
108 let len_bytes = (data.len() as u32).to_be_bytes();
110 stream.write_all(&len_bytes).await?;
111 stream.write_all(data).await?;
113 stream.flush().await?;
114 Ok(())
115 }
116 AgentConnection::Grpc(_channel) => {
117 unimplemented!("gRPC transport not yet implemented")
119 }
120 }
121 }
122
123 async fn receive_raw(&mut self) -> Result<Vec<u8>, AgentProtocolError> {
125 match &mut self.connection {
126 AgentConnection::UnixSocket(stream) => {
127 let mut len_bytes = [0u8; 4];
129 stream.read_exact(&mut len_bytes).await?;
130 let message_len = u32::from_be_bytes(len_bytes) as usize;
131
132 if message_len > MAX_MESSAGE_SIZE {
134 return Err(AgentProtocolError::MessageTooLarge {
135 size: message_len,
136 max: MAX_MESSAGE_SIZE,
137 });
138 }
139
140 let mut buffer = vec![0u8; message_len];
142 stream.read_exact(&mut buffer).await?;
143 Ok(buffer)
144 }
145 AgentConnection::Grpc(_channel) => {
146 unimplemented!("gRPC transport not yet implemented")
148 }
149 }
150 }
151
152 pub async fn close(self) -> Result<(), AgentProtocolError> {
154 match self.connection {
155 AgentConnection::UnixSocket(mut stream) => {
156 stream.shutdown().await?;
157 Ok(())
158 }
159 AgentConnection::Grpc(_) => Ok(()),
160 }
161 }
162}