use crate::error::{Result, ZinitError};
use crate::protocol::ProtocolHandler;
use crate::retry::RetryStrategy;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
use tokio::net::UnixStream;
use tokio::time::timeout;
use tracing::{debug, error, trace};
#[derive(Debug, Clone)]
pub struct ConnectionManager {
socket_path: PathBuf,
connection_timeout: Duration,
operation_timeout: Duration,
retry_strategy: RetryStrategy,
}
impl ConnectionManager {
pub fn new(
socket_path: impl AsRef<Path>,
connection_timeout: Duration,
operation_timeout: Duration,
retry_strategy: RetryStrategy,
) -> Self {
Self {
socket_path: socket_path.as_ref().to_path_buf(),
connection_timeout,
operation_timeout,
retry_strategy,
}
}
async fn connect(&self) -> Result<UnixStream> {
debug!("Connecting to Zinit socket at {:?}", self.socket_path);
let connect_future = UnixStream::connect(&self.socket_path);
match timeout(self.connection_timeout, connect_future).await {
Ok(result) => {
let stream = result.map_err(|e| {
error!("Failed to connect to Zinit socket: {}", e);
ZinitError::ConnectionError(e)
})?;
debug!("Connected to Zinit socket");
Ok(stream)
}
Err(_) => {
error!("Connection timeout after {:?}", self.connection_timeout);
Err(ZinitError::TimeoutError(self.connection_timeout))
}
}
}
pub async fn send_command(&self, command: &str) -> Result<String> {
debug!("Sending command: {}", command);
self.retry_strategy
.retry(|| async {
let stream = self.connect().await?;
let mut buf_stream = BufStream::new(stream);
trace!("Writing command to socket");
buf_stream.write_all(command.as_bytes()).await?;
buf_stream.write_all(b"\n").await?;
buf_stream.flush().await?;
trace!("Reading response from socket");
let mut response = String::new();
match timeout(self.operation_timeout, buf_stream.read_line(&mut response)).await {
Ok(result) => {
result?;
trace!("Response received: {} bytes", response.len());
if response.ends_with('\n') {
response.pop();
if response.ends_with('\r') {
response.pop();
}
}
Ok(response)
}
Err(_) => {
error!("Operation timeout after {:?}", self.operation_timeout);
Err(ZinitError::TimeoutError(self.operation_timeout))
}
}
})
.await
}
pub async fn execute_command(&self, command: &str) -> Result<serde_json::Value> {
let response = self.send_command(command).await?;
ProtocolHandler::parse_response(&response)
}
pub async fn stream_logs(&self, command: &str) -> Result<UnixStream> {
debug!("Streaming logs with command: {}", command);
let stream = self.connect().await?;
let mut buf_stream = BufStream::new(stream);
buf_stream.write_all(command.as_bytes()).await?;
buf_stream.write_all(b"\n").await?;
buf_stream.flush().await?;
Ok(buf_stream.into_inner())
}
}