sentinel_agent_protocol/
client.rs

1//! Agent client for communicating with external agents.
2
3use serde::Serialize;
4use std::time::Duration;
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio::net::UnixStream;
7
8use crate::errors::AgentProtocolError;
9use crate::protocol::{AgentRequest, AgentResponse, EventType, MAX_MESSAGE_SIZE, PROTOCOL_VERSION};
10
11/// Agent client for communicating with external agents
12pub struct AgentClient {
13    /// Agent ID
14    id: String,
15    /// Connection to agent
16    connection: AgentConnection,
17    /// Timeout for agent calls
18    timeout: Duration,
19    /// Maximum retries
20    #[allow(dead_code)]
21    max_retries: u32,
22}
23
24/// Agent connection type
25enum AgentConnection {
26    UnixSocket(UnixStream),
27    #[allow(dead_code)]
28    Grpc(tonic::transport::Channel),
29}
30
31impl AgentClient {
32    /// Create a new Unix socket agent client
33    pub async fn unix_socket(
34        id: impl Into<String>,
35        path: impl AsRef<std::path::Path>,
36        timeout: Duration,
37    ) -> Result<Self, AgentProtocolError> {
38        let stream = UnixStream::connect(path.as_ref())
39            .await
40            .map_err(|e| AgentProtocolError::ConnectionFailed(e.to_string()))?;
41
42        Ok(Self {
43            id: id.into(),
44            connection: AgentConnection::UnixSocket(stream),
45            timeout,
46            max_retries: 3,
47        })
48    }
49
50    /// Get the agent ID
51    #[allow(dead_code)]
52    pub fn id(&self) -> &str {
53        &self.id
54    }
55
56    /// Send an event to the agent and get a response
57    pub async fn send_event(
58        &mut self,
59        event_type: EventType,
60        payload: impl Serialize,
61    ) -> Result<AgentResponse, AgentProtocolError> {
62        let request = AgentRequest {
63            version: PROTOCOL_VERSION,
64            event_type,
65            payload: serde_json::to_value(payload)
66                .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?,
67        };
68
69        // Serialize request
70        let request_bytes = serde_json::to_vec(&request)
71            .map_err(|e| AgentProtocolError::Serialization(e.to_string()))?;
72
73        // Check message size
74        if request_bytes.len() > MAX_MESSAGE_SIZE {
75            return Err(AgentProtocolError::MessageTooLarge {
76                size: request_bytes.len(),
77                max: MAX_MESSAGE_SIZE,
78            });
79        }
80
81        // Send with timeout
82        let response = tokio::time::timeout(self.timeout, async {
83            self.send_raw(&request_bytes).await?;
84            self.receive_raw().await
85        })
86        .await
87        .map_err(|_| AgentProtocolError::Timeout(self.timeout))??;
88
89        // Parse response
90        let agent_response: AgentResponse = serde_json::from_slice(&response)
91            .map_err(|e| AgentProtocolError::InvalidMessage(e.to_string()))?;
92
93        // Verify protocol version
94        if agent_response.version != PROTOCOL_VERSION {
95            return Err(AgentProtocolError::VersionMismatch {
96                expected: PROTOCOL_VERSION,
97                actual: agent_response.version,
98            });
99        }
100
101        Ok(agent_response)
102    }
103
104    /// Send raw bytes to agent
105    async fn send_raw(&mut self, data: &[u8]) -> Result<(), AgentProtocolError> {
106        match &mut self.connection {
107            AgentConnection::UnixSocket(stream) => {
108                // Write message length (4 bytes, big-endian)
109                let len_bytes = (data.len() as u32).to_be_bytes();
110                stream.write_all(&len_bytes).await?;
111                // Write message data
112                stream.write_all(data).await?;
113                stream.flush().await?;
114                Ok(())
115            }
116            AgentConnection::Grpc(_channel) => {
117                // TODO: Implement gRPC transport
118                unimplemented!("gRPC transport not yet implemented")
119            }
120        }
121    }
122
123    /// Receive raw bytes from agent
124    async fn receive_raw(&mut self) -> Result<Vec<u8>, AgentProtocolError> {
125        match &mut self.connection {
126            AgentConnection::UnixSocket(stream) => {
127                // Read message length (4 bytes, big-endian)
128                let mut len_bytes = [0u8; 4];
129                stream.read_exact(&mut len_bytes).await?;
130                let message_len = u32::from_be_bytes(len_bytes) as usize;
131
132                // Check message size
133                if message_len > MAX_MESSAGE_SIZE {
134                    return Err(AgentProtocolError::MessageTooLarge {
135                        size: message_len,
136                        max: MAX_MESSAGE_SIZE,
137                    });
138                }
139
140                // Read message data
141                let mut buffer = vec![0u8; message_len];
142                stream.read_exact(&mut buffer).await?;
143                Ok(buffer)
144            }
145            AgentConnection::Grpc(_channel) => {
146                // TODO: Implement gRPC transport
147                unimplemented!("gRPC transport not yet implemented")
148            }
149        }
150    }
151
152    /// Close the agent connection
153    pub async fn close(self) -> Result<(), AgentProtocolError> {
154        match self.connection {
155            AgentConnection::UnixSocket(mut stream) => {
156                stream.shutdown().await?;
157                Ok(())
158            }
159            AgentConnection::Grpc(_) => Ok(()),
160        }
161    }
162}