rho-cli 0.1.28

Rho CLI tools for encrypted agent collaboration, dataset publishing, controlled runs, and result release workflows
Documentation
//! Async relay client (nostr-sdk): connect, publish, resolve, fetch, subscribe.
//! Used by both the `rho nostr` CLI commands and the desktop Tauri commands.

use std::time::Duration;

use nostr_sdk::prelude::*;
use serde::Serialize;

use crate::RhoResult;
use crate::nostr::{
    DM_KIND, IDENTITY_KIND, NostrIdentity, RhoEncryptedChatContent, build_dm_event,
    build_identity_event,
};

const FETCH_TIMEOUT: Duration = Duration::from_secs(8);

/// A resolved directory (kind 30382) record.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DirectoryRecord {
    pub rho_id: String,
    pub display_name: String,
    /// x-only controller pubkey (the `nostr:` prefix stripped).
    pub controller_pubkey: String,
    pub relay_url: Option<String>,
}

/// A decrypted chat message.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChatMessage {
    pub event_id: String,
    pub from_pubkey: String,
    pub from_rho_id: String,
    pub to_rho_id: String,
    pub text: String,
    pub created_at: u64,
}

pub struct RelayClient {
    client: Client,
}

impl RelayClient {
    /// Connect to a relay using the given controller identity as signer. Waits
    /// for the connection to be established so subsequent publishes aren't sent
    /// before the socket is ready.
    pub async fn connect(identity: &NostrIdentity, relay_url: &str) -> RhoResult<Self> {
        let client = Client::new(identity.keys().clone());
        client.add_relay(relay_url).await?;
        let output = client.try_connect(Duration::from_secs(10)).await;
        if output.success.is_empty() {
            return Err(format!(
                "could not connect to relay {relay_url}: {:?}",
                output.failed
            )
            .into());
        }
        Ok(Self { client })
    }

    pub async fn publish_identity(
        &self,
        identity: &NostrIdentity,
        rho_id: &str,
        display_name: &str,
        relay_url: &str,
    ) -> RhoResult<String> {
        let event = build_identity_event(identity, rho_id, display_name, relay_url)?;
        let id = event.id;
        let output = self.client.send_event(&event).await?;
        if output.success.is_empty() {
            return Err(format!("relay rejected identity record: {:?}", output.failed).into());
        }
        Ok(id.to_hex())
    }

    pub async fn send_message(
        &self,
        identity: &NostrIdentity,
        recipient_pubkey: &str,
        from_rho_id: &str,
        to_rho_id: &str,
        text: &str,
    ) -> RhoResult<String> {
        let event = build_dm_event(identity, recipient_pubkey, from_rho_id, to_rho_id, text)?;
        let id = event.id;
        let output = self.client.send_event(&event).await?;
        if output.success.is_empty() {
            return Err(format!("relay rejected message: {:?}", output.failed).into());
        }
        Ok(id.to_hex())
    }

    /// Resolve a Rho ID to its latest directory record.
    pub async fn resolve(&self, rho_id: &str) -> RhoResult<Option<DirectoryRecord>> {
        let filter = Filter::new()
            .kind(Kind::from(IDENTITY_KIND))
            .identifier(rho_id);
        let events = self.client.fetch_events(filter, FETCH_TIMEOUT).await?;
        let newest = events.into_iter().max_by_key(|e| e.created_at);
        Ok(newest.as_ref().and_then(parse_directory_record))
    }

    /// One-shot fetch + decrypt of all DMs addressed to this identity.
    pub async fn fetch_inbox(&self, identity: &NostrIdentity) -> RhoResult<Vec<ChatMessage>> {
        let filter = Filter::new()
            .kind(Kind::from(DM_KIND))
            .pubkey(identity.keys().public_key());
        let events = self.client.fetch_events(filter, FETCH_TIMEOUT).await?;
        let mut out: Vec<ChatMessage> = events
            .iter()
            .filter_map(|event| decrypt_event(identity, event))
            .collect();
        out.sort_by_key(|m| m.created_at);
        Ok(out)
    }

    /// Subscribe to live DMs for this identity (use with `notifications`).
    pub async fn subscribe_inbox(&self, identity: &NostrIdentity) -> RhoResult<()> {
        let filter = Filter::new()
            .kind(Kind::from(DM_KIND))
            .pubkey(identity.keys().public_key());
        self.client.subscribe(filter, None).await?;
        Ok(())
    }

    pub fn notifications(&self) -> tokio::sync::broadcast::Receiver<RelayPoolNotification> {
        self.client.notifications()
    }

    /// Subscribe and invoke `on_message` for each newly received DM. Blocks
    /// until the relay channel closes.
    pub async fn listen<F>(&self, identity: &NostrIdentity, mut on_message: F) -> RhoResult<()>
    where
        F: FnMut(ChatMessage),
    {
        self.subscribe_inbox(identity).await?;
        let mut rx = self.client.notifications();
        while let Ok(notification) = rx.recv().await {
            if let RelayPoolNotification::Event { event, .. } = notification
                && event.kind == Kind::from(DM_KIND)
                && let Some(message) = decrypt_event(identity, &event)
            {
                on_message(message);
            }
        }
        Ok(())
    }
}

pub fn parse_directory_record(event: &Event) -> Option<DirectoryRecord> {
    let value: serde_json::Value = serde_json::from_str(&event.content).ok()?;
    let rho_id = value["rho_id"].as_str()?.to_string();
    let controller = value["controller"].as_str().unwrap_or_default();
    let controller_pubkey = controller
        .strip_prefix("nostr:")
        .unwrap_or(controller)
        .to_string();
    Some(DirectoryRecord {
        rho_id,
        display_name: value["display_name"]
            .as_str()
            .unwrap_or_default()
            .to_string(),
        controller_pubkey,
        relay_url: value["resources"]["inbox"]["uri"]
            .as_str()
            .map(|s| s.to_string()),
    })
}

pub fn decrypt_event(identity: &NostrIdentity, event: &Event) -> Option<ChatMessage> {
    let content: RhoEncryptedChatContent = serde_json::from_str(&event.content).ok()?;
    let from_pubkey = event.pubkey.to_hex();
    let text = identity
        .decrypt_from(&from_pubkey, &content.envelope)
        .ok()?;
    Some(ChatMessage {
        event_id: event.id.to_hex(),
        from_pubkey,
        from_rho_id: content.from,
        to_rho_id: content.to,
        text,
        created_at: event.created_at.as_secs(),
    })
}