righvalor 0.1.0

RighValor: AI Infrastructure and Applications Framework for the Far Edge
use std::path::PathBuf;

use ractor::ActorRef;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::UnixListener,
};

use crate::{
    master::ValorMasterMessage,
    types::{ValorID, ValorIdExt},
    uds::{UdsRegistrationResponse, UdsWorkerRegistration},
};

/// # UDS Server
///
/// UDS server for Valor Master to handle worker registration requests.
/// Workers register with master via UDS for task management.
#[derive(Clone)]
pub struct UdsServer {
    /// Master's UDS socket path
    uds_path: PathBuf,
    /// Master's unique identifier
    master_id: ValorID,
    /// Master Actor Reference
    master_actor: ActorRef<ValorMasterMessage>,
}

impl UdsServer {
    /// Create a new UDS server for the master
    pub fn new(
        uds_path: PathBuf,
        master_id: &str,
        master_actor: ActorRef<ValorMasterMessage>,
    ) -> Self {
        Self {
            uds_path,
            master_id: ValorID::new_master(master_id),
            master_actor,
        }
    }

    /// Start the UDS server and listen for worker registration requests
    pub async fn start(&self) -> anyhow::Result<()> {
        // If socket exists, check if an active master is already listening
        if self.uds_path.exists() {
            match tokio::net::UnixStream::connect(&self.uds_path).await {
                Ok(_) => {
                    // Another master is active; refuse to start
                    anyhow::bail!(
                        "UDS path {} already in use by an active master; refusing to start",
                        self.uds_path.display()
                    );
                }
                Err(_) => {
                    // Stale socket; remove it
                    let _ = std::fs::remove_file(&self.uds_path);
                }
            }
        }

        // Create Unix domain socket listener
        let listener = UnixListener::bind(&self.uds_path)?;

        tracing::info!(
            "UDS server started for master {} on {}",
            self.master_id,
            self.uds_path.display()
        );

        // Handle worker registration requests
        let master_id = self.master_id.clone();
        let master_actor = self.master_actor.clone();
        let uds_path_display = self.uds_path.display().to_string();
        let srv_span = tracing::info_span!(
            "flow.master.uds.accept_loop",
            master_id = %master_id,
            uds_path = %uds_path_display
        );

        tokio::spawn({
            let srv_span = srv_span.clone();
            async move {
                let _e = srv_span.enter();
                loop {
                    match listener.accept().await {
                        Ok((mut stream, _addr)) => {
                            tracing::info!("Master {} received connection from worker", master_id);

                            // Read registration data from stream
                            let mut buffer = Vec::new();
                            let mut chunk = [0u8; 1024];

                            loop {
                                match stream.read(&mut chunk).await {
                                    Ok(n) => {
                                        if n == 0 {
                                            break; // EOF
                                        }
                                        buffer.extend_from_slice(&chunk[..n]);
                                    }
                                    Err(e) => {
                                        tracing::error!("Error reading from UDS stream: {}", e);
                                        break;
                                    }
                                }
                            }

                            // Deserialize registration
                            match serde_json::from_slice::<UdsWorkerRegistration>(&buffer) {
                                Ok(registration) => {
                                    let conn_span = tracing::info_span!(
                                        "flow.master.uds.connection",
                                        worker_id = %registration.worker_id
                                    );
                                    tracing::info!(
                                        parent: &conn_span,
                                        "Master {} received registration from worker {}",
                                        master_id,
                                        registration.worker_id
                                    );

                                    // Always accept via UDS; leave duplicate handling to Master
                                    let _ = master_actor.cast(
                                        ValorMasterMessage::SouthboundRegisterWorker(
                                            registration.worker_id,
                                            registration.ipv4_addr,
                                        ),
                                    );

                                    // Send success response
                                    let response = UdsRegistrationResponse::Accepted;
                                    let response_bytes =
                                        serde_json::to_vec(&response).unwrap_or_default();
                                    let _ = stream.write_all(&response_bytes).await;
                                }
                                Err(e) => {
                                    tracing::error!("Failed to deserialize registration: {}", e);
                                    let response = UdsRegistrationResponse::Rejected {
                                        reason: format!("Invalid registration format: {e}"),
                                    };
                                    let response_bytes =
                                        serde_json::to_vec(&response).unwrap_or_default();
                                    let _ = stream.write_all(&response_bytes).await;
                                }
                            }
                        }
                        Err(e) => {
                            tracing::error!("UDS server accept error: {}", e);
                            break;
                        }
                    }
                }
            }
        });

        Ok(())
    }
}