loong-kernel 0.1.2-alpha.1

Internal support crate for Loong: kernel primitives and governance core
Documentation
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{Mutex, mpsc, watch};

pub const ROOT_AGENT_PATH: &str = "/root";

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct AgentPath(String);

impl AgentPath {
    pub fn from_string(raw: impl AsRef<str>) -> Result<Self, String> {
        let normalized = normalize_agent_path(raw.as_ref())?;
        Ok(Self(normalized))
    }

    pub fn root() -> Self {
        Self(ROOT_AGENT_PATH.to_owned())
    }

    pub fn as_str(&self) -> &str {
        self.0.as_str()
    }

    pub fn join(&self, child: impl AsRef<str>) -> Result<Self, String> {
        let child = child.as_ref().trim();
        if child.is_empty() {
            return Err("agent_path_invalid: child segment must not be empty".to_owned());
        }
        if child.contains('/') {
            return Err("agent_path_invalid: child segment must not contain `/`".to_owned());
        }
        if !is_valid_segment(child) {
            return Err(format!(
                "agent_path_invalid: child segment `{child}` contains unsupported characters"
            ));
        }
        Self::from_string(format!("{}/{}", self.0, child))
    }
}

impl Default for AgentPath {
    fn default() -> Self {
        Self::root()
    }
}

impl AsRef<str> for AgentPath {
    fn as_ref(&self) -> &str {
        self.as_str()
    }
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum MailboxContent {
    DelegateResult {
        session_id: String,
        frozen_result: Value,
    },
    StatusNotification {
        reason: String,
    },
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct InterAgentMessage {
    pub author: AgentPath,
    pub recipient: AgentPath,
    pub content: MailboxContent,
    pub trigger_turn: bool,
}

#[derive(Debug)]
struct AgentMailboxState {
    receiver: Mutex<mpsc::UnboundedReceiver<InterAgentMessage>>,
    sequence: AtomicU64,
    notifier: watch::Sender<u64>,
}

#[derive(Debug, Clone)]
pub struct AgentMailbox {
    sender: mpsc::UnboundedSender<InterAgentMessage>,
    state: Arc<AgentMailboxState>,
}

impl AgentMailbox {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::unbounded_channel();
        let (notifier, _) = watch::channel(0_u64);
        Self {
            sender,
            state: Arc::new(AgentMailboxState {
                receiver: Mutex::new(receiver),
                sequence: AtomicU64::new(0),
                notifier,
            }),
        }
    }

    pub fn send(&self, msg: InterAgentMessage) -> Result<(), String> {
        self.sender
            .send(msg)
            .map_err(|error| format!("agent_mailbox_closed: {error}"))?;
        let next_seq = self.state.sequence.fetch_add(1, Ordering::Relaxed) + 1;
        let _ = self.state.notifier.send(next_seq);
        Ok(())
    }

    pub fn subscribe(&self) -> watch::Receiver<u64> {
        self.state.notifier.subscribe()
    }

    pub async fn drain(&self) -> Vec<InterAgentMessage> {
        let mut receiver = self.state.receiver.lock().await;
        let mut drained = VecDeque::new();
        while let Ok(message) = receiver.try_recv() {
            drained.push_back(message);
        }
        drained.into_iter().collect()
    }
}

impl Default for AgentMailbox {
    fn default() -> Self {
        Self::new()
    }
}

fn normalize_agent_path(raw: &str) -> Result<String, String> {
    let trimmed = raw.trim();
    if trimmed.is_empty() {
        return Err("agent_path_invalid: path must not be empty".to_owned());
    }
    if !trimmed.starts_with('/') {
        return Err("agent_path_invalid: path must start with `/`".to_owned());
    }

    let mut segments = Vec::new();
    for segment in trimmed.split('/').skip(1) {
        if segment.is_empty() {
            return Err("agent_path_invalid: empty path segment".to_owned());
        }
        if !is_valid_segment(segment) {
            return Err(format!(
                "agent_path_invalid: segment `{segment}` contains unsupported characters"
            ));
        }
        segments.push(segment);
    }

    if segments.is_empty() {
        return Err("agent_path_invalid: root segment is required".to_owned());
    }

    Ok(format!("/{}", segments.join("/")))
}

fn is_valid_segment(segment: &str) -> bool {
    segment
        .chars()
        .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' || ch == '.' || ch == ':')
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[tokio::test]
    async fn mailbox_send_subscribe_drain_lifecycle() {
        let mailbox = AgentMailbox::new();
        let mut subscription = mailbox.subscribe();

        let author = AgentPath::root();
        let recipient = author.join("task").unwrap_or_else(|_| AgentPath::root());
        let send_result = mailbox.send(InterAgentMessage {
            author,
            recipient,
            content: MailboxContent::StatusNotification {
                reason: "child_completed".to_owned(),
            },
            trigger_turn: true,
        });
        assert!(send_result.is_ok());

        let changed_result = subscription.changed().await;
        assert!(changed_result.is_ok());

        let drained = mailbox.drain().await;
        assert_eq!(drained.len(), 1);
    }

    #[tokio::test]
    async fn mailbox_sequence_increments() {
        let mailbox = AgentMailbox::new();
        let mut subscription = mailbox.subscribe();

        let first = mailbox.send(InterAgentMessage {
            author: AgentPath::root(),
            recipient: AgentPath::root(),
            content: MailboxContent::StatusNotification {
                reason: "first".to_owned(),
            },
            trigger_turn: false,
        });
        assert!(first.is_ok());
        let first_changed = subscription.changed().await;
        assert!(first_changed.is_ok());
        let first_seq = *subscription.borrow();

        let second = mailbox.send(InterAgentMessage {
            author: AgentPath::root(),
            recipient: AgentPath::root(),
            content: MailboxContent::DelegateResult {
                session_id: "child-1".to_owned(),
                frozen_result: json!({"status": "ok"}),
            },
            trigger_turn: true,
        });
        assert!(second.is_ok());
        let second_changed = subscription.changed().await;
        assert!(second_changed.is_ok());
        let second_seq = *subscription.borrow();

        assert!(second_seq > first_seq);
    }

    #[tokio::test]
    async fn mailbox_supports_multiple_senders() {
        let mailbox = AgentMailbox::new();
        let mailbox_2 = mailbox.clone();

        let send_1 = mailbox.send(InterAgentMessage {
            author: AgentPath::root(),
            recipient: AgentPath::root(),
            content: MailboxContent::StatusNotification {
                reason: "a".to_owned(),
            },
            trigger_turn: false,
        });
        assert!(send_1.is_ok());

        let send_2 = mailbox_2.send(InterAgentMessage {
            author: AgentPath::root(),
            recipient: AgentPath::root(),
            content: MailboxContent::StatusNotification {
                reason: "b".to_owned(),
            },
            trigger_turn: false,
        });
        assert!(send_2.is_ok());

        let drained = mailbox.drain().await;
        assert_eq!(drained.len(), 2);
    }

    #[test]
    fn agent_path_validates_and_joins() {
        let root = AgentPath::from_string(ROOT_AGENT_PATH);
        assert!(root.is_ok());
        let root = root.unwrap_or_else(|_| AgentPath::root());

        let child = root.join("subtask");
        assert!(child.is_ok());
        let child = child.unwrap_or_else(|_| AgentPath::root());

        assert_eq!(child.as_str(), "/root/subtask");
        assert!(AgentPath::from_string("root/subtask").is_err());
        assert!(AgentPath::from_string("/root//subtask").is_err());
    }
}