#![allow(dead_code)]
use super::discovery::ensure_coordinator_running;
use super::protocol::{ClientType, MessageType, RegisterAckPayload, UiType, WireMessage};
use cuenv_events::CuenvEvent;
use std::io;
use tokio::net::UnixStream;
use uuid::Uuid;
#[derive(Debug)]
pub struct CoordinatorClient {
stream: UnixStream,
client_id: Uuid,
}
impl CoordinatorClient {
pub async fn connect_as_producer(command: &str) -> io::Result<Option<Self>> {
Self::connect(ClientType::Producer {
command: command.to_string(),
})
.await
}
pub async fn connect_as_consumer(ui_type: UiType) -> io::Result<Self> {
Self::connect(ClientType::Consumer { ui_type })
.await?
.ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "coordinator not available"))
}
async fn connect(client_type: ClientType) -> io::Result<Option<Self>> {
let handle = match ensure_coordinator_running().await {
Ok(h) => h,
Err(e) => {
tracing::debug!(error = %e, "Coordinator not available");
return Ok(None);
}
};
let mut stream = match UnixStream::connect(&handle.socket).await {
Ok(s) => s,
Err(e) => {
tracing::debug!(error = %e, "Failed to connect to coordinator");
return Ok(None);
}
};
let client_id = Uuid::new_v4();
let pid = std::process::id();
let reg_msg = WireMessage::register(client_id, client_type, pid);
reg_msg.write_to(&mut stream).await?;
let ack_msg = WireMessage::read_from(&mut stream).await?;
if ack_msg.msg_type != MessageType::RegisterAck {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected registration ack",
));
}
let ack: RegisterAckPayload = serde_json::from_value(ack_msg.payload)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
if !ack.success {
return Err(io::Error::new(
io::ErrorKind::ConnectionRefused,
ack.error
.unwrap_or_else(|| "registration failed".to_string()),
));
}
tracing::debug!(client_id = %client_id, "Connected to coordinator");
Ok(Some(Self { stream, client_id }))
}
pub async fn send_event(&mut self, event: &CuenvEvent) -> io::Result<()> {
let msg = WireMessage::event(event);
msg.write_to(&mut self.stream).await
}
pub async fn recv_event(&mut self) -> io::Result<Option<CuenvEvent>> {
let msg = WireMessage::read_from(&mut self.stream).await?;
match msg.msg_type {
MessageType::Event => Ok(msg.into_event()),
MessageType::Ping => {
let pong = WireMessage::pong(msg.correlation_id);
pong.write_to(&mut self.stream).await?;
Ok(None)
}
_ => Ok(None),
}
}
pub async fn ping(&mut self) -> io::Result<()> {
let ping = WireMessage::ping();
ping.write_to(&mut self.stream).await?;
let msg = WireMessage::read_from(&mut self.stream).await?;
if msg.msg_type == MessageType::Pong {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected pong response",
))
}
}
#[must_use]
pub const fn client_id(&self) -> Uuid {
self.client_id
}
pub async fn is_alive(&mut self) -> bool {
self.ping().await.is_ok()
}
}
#[derive(Debug, Clone)]
pub struct CoordinatorHandle {
pub pid: Option<u32>,
pub socket: std::path::PathBuf,
pub we_started_it: bool,
}
impl CoordinatorHandle {
#[must_use]
pub const fn existing(pid: u32, socket: std::path::PathBuf) -> Self {
Self {
pid: Some(pid),
socket,
we_started_it: false,
}
}
#[must_use]
pub const fn new(pid: u32, socket: std::path::PathBuf) -> Self {
Self {
pid: Some(pid),
socket,
we_started_it: true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_coordinator_handle_existing() {
let socket = PathBuf::from("/tmp/test.sock");
let handle = CoordinatorHandle::existing(1234, socket.clone());
assert_eq!(handle.pid, Some(1234));
assert_eq!(handle.socket, socket);
assert!(!handle.we_started_it);
}
#[test]
fn test_coordinator_handle_new() {
let socket = PathBuf::from("/tmp/new.sock");
let handle = CoordinatorHandle::new(5678, socket.clone());
assert_eq!(handle.pid, Some(5678));
assert_eq!(handle.socket, socket);
assert!(handle.we_started_it);
}
#[test]
fn test_coordinator_handle_clone() {
let socket = PathBuf::from("/tmp/clone.sock");
let handle = CoordinatorHandle::new(1111, socket);
let cloned = handle.clone();
assert_eq!(handle.pid, cloned.pid);
assert_eq!(handle.socket, cloned.socket);
assert_eq!(handle.we_started_it, cloned.we_started_it);
}
#[test]
fn test_coordinator_handle_debug() {
let socket = PathBuf::from("/var/run/cuenv.sock");
let handle = CoordinatorHandle::existing(2222, socket);
let debug = format!("{handle:?}");
assert!(debug.contains("CoordinatorHandle"));
assert!(debug.contains("2222"));
assert!(debug.contains("cuenv.sock"));
}
#[test]
fn test_coordinator_handle_socket_with_spaces() {
let socket = PathBuf::from("/tmp/path with spaces/test.sock");
let handle = CoordinatorHandle::new(3333, socket.clone());
assert_eq!(handle.socket, socket);
}
#[test]
fn test_coordinator_handle_relative_socket() {
let socket = PathBuf::from("./local.sock");
let handle = CoordinatorHandle::existing(4444, socket.clone());
assert_eq!(handle.socket, socket);
}
#[test]
fn test_coordinator_handle_existing_vs_new_difference() {
let socket = PathBuf::from("/tmp/test.sock");
let existing = CoordinatorHandle::existing(100, socket.clone());
let new = CoordinatorHandle::new(100, socket);
assert_eq!(existing.pid, new.pid);
assert_eq!(existing.socket, new.socket);
assert!(!existing.we_started_it);
assert!(new.we_started_it);
}
}