rho-cli 0.1.29

Rho CLI tools for encrypted agent collaboration, dataset publishing, controlled runs, and result release workflows
Documentation
//! Async relay client (nostr-sdk): connect, publish, resolve, fetch, subscribe.
//! Used by both the `rho nostr` CLI commands and the desktop Tauri commands.

use std::time::Duration;

use nostr_sdk::prelude::*;
use serde::Serialize;

use crate::RhoResult;
use crate::nostr::{
    DM_KIND, IDENTITY_KIND, NostrIdentity, REPO_KIND, RhoEncryptedChatContent, build_dm_event,
    build_identity_event, build_repo_event,
};

const FETCH_TIMEOUT: Duration = Duration::from_secs(8);

/// A resolved directory (kind 30382) record.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DirectoryRecord {
    pub rho_id: String,
    pub display_name: String,
    /// x-only controller pubkey (the `nostr:` prefix stripped).
    pub controller_pubkey: String,
    pub relay_url: Option<String>,
}

/// A discovered repo (kind 30383) record.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RepoListing {
    pub rho_repo_id: String,
    pub slug: String,
    pub owner_rho_id: String,
    pub web_url: String,
    pub description: String,
}

/// A decrypted chat message.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChatMessage {
    pub event_id: String,
    pub from_pubkey: String,
    pub from_rho_id: String,
    pub to_rho_id: String,
    pub text: String,
    pub created_at: u64,
}

pub struct RelayClient {
    client: Client,
}

impl RelayClient {
    /// Connect to a relay using the given controller identity as signer. Waits
    /// for the connection to be established so subsequent publishes aren't sent
    /// before the socket is ready.
    pub async fn connect(identity: &NostrIdentity, relay_url: &str) -> RhoResult<Self> {
        let client = Client::new(identity.keys().clone());
        client.add_relay(relay_url).await?;
        let output = client.try_connect(Duration::from_secs(10)).await;
        if output.success.is_empty() {
            return Err(format!(
                "could not connect to relay {relay_url}: {:?}",
                output.failed
            )
            .into());
        }
        Ok(Self { client })
    }

    pub async fn publish_identity(
        &self,
        identity: &NostrIdentity,
        rho_id: &str,
        display_name: &str,
        relay_url: &str,
    ) -> RhoResult<String> {
        let event = build_identity_event(identity, rho_id, display_name, relay_url)?;
        let id = event.id;
        let output = self.client.send_event(&event).await?;
        if output.success.is_empty() {
            return Err(format!("relay rejected identity record: {:?}", output.failed).into());
        }
        Ok(id.to_hex())
    }

    pub async fn send_message(
        &self,
        identity: &NostrIdentity,
        recipient_pubkey: &str,
        from_rho_id: &str,
        to_rho_id: &str,
        text: &str,
    ) -> RhoResult<String> {
        let event = build_dm_event(identity, recipient_pubkey, from_rho_id, to_rho_id, text)?;
        let id = event.id;
        let output = self.client.send_event(&event).await?;
        if output.success.is_empty() {
            return Err(format!("relay rejected message: {:?}", output.failed).into());
        }
        Ok(id.to_hex())
    }

    /// Resolve a Rho ID to its latest directory record.
    pub async fn resolve(&self, rho_id: &str) -> RhoResult<Option<DirectoryRecord>> {
        let filter = Filter::new()
            .kind(Kind::from(IDENTITY_KIND))
            .identifier(rho_id);
        let events = self.client.fetch_events(filter, FETCH_TIMEOUT).await?;
        let newest = events.into_iter().max_by_key(|e| e.created_at);
        Ok(newest.as_ref().and_then(parse_directory_record))
    }

    /// Publish a repo discovery record (kind 30383).
    pub async fn publish_repo(
        &self,
        identity: &NostrIdentity,
        slug: &str,
        rho_repo_id: &str,
        owner_rho_id: &str,
        web_url: &str,
        description: &str,
    ) -> RhoResult<String> {
        let event = build_repo_event(
            identity,
            slug,
            rho_repo_id,
            owner_rho_id,
            web_url,
            description,
        )?;
        let id = event.id;
        let output = self.client.send_event(&event).await?;
        if output.success.is_empty() {
            return Err(format!("relay rejected repo record: {:?}", output.failed).into());
        }
        Ok(id.to_hex())
    }

    /// List every repo (kind 30383) record on the relay, newest per slug.
    pub async fn list_repos(&self) -> RhoResult<Vec<RepoListing>> {
        use std::collections::HashMap;
        let filter = Filter::new().kind(Kind::from(REPO_KIND));
        let events = self.client.fetch_events(filter, FETCH_TIMEOUT).await?;
        let mut latest: HashMap<String, (u64, RepoListing)> = HashMap::new();
        for event in events.iter() {
            if let Some(listing) = parse_repo_record(event) {
                let ts = event.created_at.as_secs();
                match latest.get(&listing.slug) {
                    Some((seen, _)) if *seen >= ts => {}
                    _ => {
                        latest.insert(listing.slug.clone(), (ts, listing));
                    }
                }
            }
        }
        let mut out: Vec<RepoListing> = latest.into_values().map(|(_, r)| r).collect();
        out.sort_by(|a, b| a.slug.cmp(&b.slug));
        Ok(out)
    }

    /// List every directory (kind 30382) record on the relay, newest per Rho ID.
    pub async fn list_directory(&self) -> RhoResult<Vec<DirectoryRecord>> {
        use std::collections::HashMap;
        let filter = Filter::new().kind(Kind::from(IDENTITY_KIND));
        let events = self.client.fetch_events(filter, FETCH_TIMEOUT).await?;
        let mut latest: HashMap<String, (u64, DirectoryRecord)> = HashMap::new();
        for event in events.iter() {
            if let Some(record) = parse_directory_record(event) {
                let ts = event.created_at.as_secs();
                match latest.get(&record.rho_id) {
                    Some((seen, _)) if *seen >= ts => {}
                    _ => {
                        latest.insert(record.rho_id.clone(), (ts, record));
                    }
                }
            }
        }
        let mut out: Vec<DirectoryRecord> = latest.into_values().map(|(_, r)| r).collect();
        out.sort_by(|a, b| a.rho_id.cmp(&b.rho_id));
        Ok(out)
    }

    /// One-shot fetch + decrypt of all DMs addressed to this identity.
    pub async fn fetch_inbox(&self, identity: &NostrIdentity) -> RhoResult<Vec<ChatMessage>> {
        let filter = Filter::new()
            .kind(Kind::from(DM_KIND))
            .pubkey(identity.keys().public_key());
        let events = self.client.fetch_events(filter, FETCH_TIMEOUT).await?;
        let mut out: Vec<ChatMessage> = events
            .iter()
            .filter_map(|event| decrypt_event(identity, event))
            .collect();
        out.sort_by_key(|m| m.created_at);
        Ok(out)
    }

    /// Subscribe to live DMs for this identity (use with `notifications`).
    pub async fn subscribe_inbox(&self, identity: &NostrIdentity) -> RhoResult<()> {
        let filter = Filter::new()
            .kind(Kind::from(DM_KIND))
            .pubkey(identity.keys().public_key());
        self.client.subscribe(filter, None).await?;
        Ok(())
    }

    pub fn notifications(&self) -> tokio::sync::broadcast::Receiver<RelayPoolNotification> {
        self.client.notifications()
    }

    /// Subscribe and invoke `on_message` for each newly received DM. Blocks
    /// until the relay channel closes.
    pub async fn listen<F>(&self, identity: &NostrIdentity, mut on_message: F) -> RhoResult<()>
    where
        F: FnMut(ChatMessage),
    {
        self.subscribe_inbox(identity).await?;
        let mut rx = self.client.notifications();
        while let Ok(notification) = rx.recv().await {
            if let RelayPoolNotification::Event { event, .. } = notification
                && event.kind == Kind::from(DM_KIND)
                && let Some(message) = decrypt_event(identity, &event)
            {
                on_message(message);
            }
        }
        Ok(())
    }
}

pub fn parse_directory_record(event: &Event) -> Option<DirectoryRecord> {
    let value: serde_json::Value = serde_json::from_str(&event.content).ok()?;
    let rho_id = value["rho_id"].as_str()?.to_string();
    let controller = value["controller"].as_str().unwrap_or_default();
    let controller_pubkey = controller
        .strip_prefix("nostr:")
        .unwrap_or(controller)
        .to_string();
    Some(DirectoryRecord {
        rho_id,
        display_name: value["display_name"]
            .as_str()
            .unwrap_or_default()
            .to_string(),
        controller_pubkey,
        relay_url: value["resources"]["inbox"]["uri"]
            .as_str()
            .map(|s| s.to_string()),
    })
}

pub fn parse_repo_record(event: &Event) -> Option<RepoListing> {
    let value: serde_json::Value = serde_json::from_str(&event.content).ok()?;
    let slug = value["slug"].as_str()?.to_string();
    Some(RepoListing {
        rho_repo_id: value["rho_repo_id"]
            .as_str()
            .unwrap_or_default()
            .to_string(),
        slug,
        owner_rho_id: value["owner"].as_str().unwrap_or_default().to_string(),
        web_url: value["web_url"].as_str().unwrap_or_default().to_string(),
        description: value["description"]
            .as_str()
            .unwrap_or_default()
            .to_string(),
    })
}

pub fn decrypt_event(identity: &NostrIdentity, event: &Event) -> Option<ChatMessage> {
    let content: RhoEncryptedChatContent = serde_json::from_str(&event.content).ok()?;
    let from_pubkey = event.pubkey.to_hex();
    let text = identity
        .decrypt_from(&from_pubkey, &content.envelope)
        .ok()?;
    Some(ChatMessage {
        event_id: event.id.to_hex(),
        from_pubkey,
        from_rho_id: content.from,
        to_rho_id: content.to,
        text,
        created_at: event.created_at.as_secs(),
    })
}