use anyhow::Result;
use crossbeam_channel::{Receiver, TryRecvError};
use nostr::ToBech32;
use nostr_double_ratchet::{
CreateGroupOptions, FanoutGroupMetadataOptions, FileStorageAdapter, GroupManager,
GroupManagerOptions, GroupSendEvent, Session, SessionManager, SessionManagerEvent,
StorageAdapter, CHAT_MESSAGE_KIND, REACTION_KIND,
};
use nostr_sdk::Client;
use serde::Serialize;
use std::sync::Arc;
use crate::config::Config;
use crate::nostr_client::send_event_or_ignore;
use crate::output::Output;
use crate::state_sync::{
extract_control_stamp_from_unsigned, select_canonical_session, ControlStamp,
};
use crate::storage::{Storage, StoredGroup, StoredGroupMessage};
#[derive(Serialize)]
struct GroupList {
groups: Vec<GroupInfo>,
}
#[derive(Serialize)]
struct GroupInfo {
id: String,
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
picture: Option<String>,
members: Vec<String>,
admins: Vec<String>,
created_at: u64,
}
fn hex_to_npub(hex: &str) -> String {
nostr::PublicKey::from_hex(hex)
.ok()
.and_then(|pk| pk.to_bech32().ok())
.unwrap_or_else(|| hex.to_string())
}
impl From<&nostr_double_ratchet::group::GroupData> for GroupInfo {
fn from(g: &nostr_double_ratchet::group::GroupData) -> Self {
GroupInfo {
id: g.id.clone(),
name: g.name.clone(),
description: g.description.clone(),
picture: g.picture.clone(),
members: g.members.iter().map(|m| hex_to_npub(m)).collect(),
admins: g.admins.iter().map(|a| hex_to_npub(a)).collect(),
created_at: g.created_at,
}
}
}
#[derive(Serialize)]
struct GroupMessageInfo {
id: String,
group_id: String,
sender_pubkey: String,
content: String,
timestamp: u64,
is_outgoing: bool,
}
#[derive(Serialize)]
struct GroupMessageList {
group_id: String,
messages: Vec<GroupMessageInfo>,
}
struct PairwiseSessionEventQueue<'a> {
my_owner_pubkey_hex: &'a str,
storage: &'a Storage,
session_manager: &'a SessionManager,
session_manager_rx: &'a Receiver<SessionManagerEvent>,
queued_events: &'a mut Vec<nostr::Event>,
}
fn import_chats_into_session_manager(
storage: &Storage,
manager: &SessionManager,
my_owner_pubkey_hex: &str,
) -> Result<()> {
let known: std::collections::HashMap<(String, String), String> = manager
.export_active_sessions()
.into_iter()
.filter_map(|(owner, device_id, state)| {
serde_json::to_string(&state)
.ok()
.map(|json| ((owner.to_hex(), device_id), json))
})
.collect();
for chat in storage.list_chats()? {
if chat.their_pubkey == my_owner_pubkey_hex {
continue;
}
let owner_pubkey = match nostr::PublicKey::from_hex(&chat.their_pubkey) {
Ok(pk) => pk,
Err(_) => continue,
};
manager.setup_user(owner_pubkey);
let device_id = chat.device_id.clone().unwrap_or_else(|| chat.id.clone());
if known
.get(&(owner_pubkey.to_hex(), device_id.clone()))
.is_some_and(|known_state| known_state == &chat.session_state)
{
continue;
}
let state: nostr_double_ratchet::SessionState =
match serde_json::from_str(&chat.session_state) {
Ok(state) => state,
Err(_) => continue,
};
manager.import_session_state(owner_pubkey, Some(device_id), state)?;
}
Ok(())
}
fn sync_member_chats_from_session_manager(
storage: &Storage,
manager: &SessionManager,
member_owner_pubkey_hex: &str,
) -> Result<()> {
use std::collections::HashMap;
let sessions_by_device: HashMap<String, nostr_double_ratchet::SessionState> = manager
.export_active_sessions()
.into_iter()
.filter_map(|(owner_pubkey, device_id, state)| {
(owner_pubkey.to_hex() == member_owner_pubkey_hex).then_some((device_id, state))
})
.collect();
if sessions_by_device.is_empty() {
return Ok(());
}
let mut owner_sessions: Vec<(String, nostr_double_ratchet::SessionState)> =
sessions_by_device.into_iter().collect();
owner_sessions.sort_by(|a, b| a.0.cmp(&b.0));
let Some((selected_device_id, selected_state)) =
select_canonical_session(member_owner_pubkey_hex, &owner_sessions)
else {
return Ok(());
};
let state_json = serde_json::to_string(&selected_state)?;
for mut chat in storage
.list_chats()?
.into_iter()
.filter(|c| c.their_pubkey == member_owner_pubkey_hex)
{
let mut changed = false;
if chat.device_id.as_deref() != Some(selected_device_id.as_str()) {
chat.device_id = Some(selected_device_id.clone());
changed = true;
}
if chat.session_state != state_json {
chat.session_state = state_json.clone();
changed = true;
}
if changed {
storage.save_chat(&chat)?;
}
}
Ok(())
}
fn build_session_manager(
config: &Config,
storage: &Storage,
) -> Result<(SessionManager, Receiver<SessionManagerEvent>)> {
let our_private_key = config.private_key_bytes()?;
let our_pubkey_hex = config.public_key()?;
let our_pubkey = nostr::PublicKey::from_hex(&our_pubkey_hex)?;
let owner_pubkey_hex = config.owner_public_key_hex()?;
let owner_pubkey = nostr::PublicKey::from_hex(&owner_pubkey_hex)?;
let session_manager_store: Arc<dyn StorageAdapter> = Arc::new(FileStorageAdapter::new(
storage.data_dir().join("session_manager"),
)?);
let (tx, rx) = crossbeam_channel::unbounded();
let manager = SessionManager::new(
our_pubkey,
our_private_key,
our_pubkey_hex,
owner_pubkey,
tx,
Some(session_manager_store),
None,
);
manager.init()?;
import_chats_into_session_manager(storage, &manager, &owner_pubkey_hex)?;
while rx.try_recv().is_ok() {}
Ok((manager, rx))
}
fn drain_session_manager_message_events(rx: &Receiver<SessionManagerEvent>) -> Vec<nostr::Event> {
let mut message_events = Vec::new();
loop {
match rx.try_recv() {
Ok(SessionManagerEvent::PublishSigned(event))
if event.kind.as_u16() == nostr_double_ratchet::MESSAGE_EVENT_KIND as u16 =>
{
message_events.push(event);
}
Ok(_) => {}
Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
}
}
message_events
}
async fn publish_events_with_retry(
client: &Client,
events: &[nostr::Event],
attempts: usize,
retry_ms: u64,
) -> bool {
if events.is_empty() {
return false;
}
for attempt in 0..attempts {
let mut all_ok = true;
for ev in events {
if send_event_or_ignore(client, ev.clone()).await.is_err() {
all_ok = false;
break;
}
}
if all_ok {
return true;
}
if attempt + 1 < attempts {
tokio::time::sleep(std::time::Duration::from_millis(retry_ms)).await;
}
}
false
}
async fn fan_out_metadata(
group: &nostr_double_ratchet::group::GroupData,
excluded_member: Option<&str>,
config: &Config,
storage: &Storage,
) -> Result<ControlStamp> {
let my_owner_pubkey = config.owner_public_key_hex()?;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64;
let client = Client::default();
let relays = config.resolved_relays();
for relay in &relays {
client.add_relay(relay).await?;
}
client.connect().await;
let mut group_manager = build_group_manager(config, storage)?;
let (session_manager, session_manager_rx) = build_session_manager(config, storage)?;
let mut pairwise_events: Vec<nostr::Event> = Vec::new();
let mut pairwise_queue = PairwiseSessionEventQueue {
my_owner_pubkey_hex: &my_owner_pubkey,
storage,
session_manager: &session_manager,
session_manager_rx: &session_manager_rx,
queued_events: &mut pairwise_events,
};
let mut send_pairwise = |recipient_owner: nostr::PublicKey,
rumor: &nostr::UnsignedEvent|
-> nostr_double_ratchet::Result<()> {
let recipient_owner_hex = recipient_owner.to_hex();
let _ = queue_pairwise_session_events_for_recipient(
recipient_owner,
&recipient_owner_hex,
rumor,
&mut pairwise_queue,
);
Ok(())
};
let fanout = group_manager
.fan_out_group_metadata(
group.clone(),
FanoutGroupMetadataOptions {
send_pairwise: &mut send_pairwise,
exclude_secret_for: excluded_member,
now_ms: Some(now_ms),
},
)
.map_err(|e| anyhow::anyhow!("Failed to fan out group metadata: {}", e))?;
let stamp = extract_control_stamp_from_unsigned(&fanout.metadata_rumor)
.ok_or_else(|| anyhow::anyhow!("group metadata rumor missing control stamp"))?;
const MAX_DELIVERY_ATTEMPTS: usize = 20;
const DELIVERY_RETRY_MS: u64 = 100;
let _ = publish_events_with_retry(
&client,
&pairwise_events,
MAX_DELIVERY_ATTEMPTS,
DELIVERY_RETRY_MS,
)
.await;
Ok(stamp)
}
fn build_group_manager(config: &Config, storage: &Storage) -> Result<GroupManager> {
let our_owner_pubkey = nostr::PublicKey::from_hex(&config.owner_public_key_hex()?)?;
let our_device_pubkey = nostr::PublicKey::from_hex(&config.public_key()?)?;
let group_manager_store: Arc<dyn StorageAdapter> = Arc::new(FileStorageAdapter::new(
storage.data_dir().join("group_manager"),
)?);
Ok(GroupManager::new(GroupManagerOptions {
our_owner_pubkey,
our_device_pubkey,
storage: Some(group_manager_store),
one_to_many: None,
}))
}
fn invalidate_group_manager_sender_state(
group_id: &str,
my_device_pubkey_hex: &str,
storage: &Storage,
) -> Result<()> {
let store = FileStorageAdapter::new(storage.data_dir().join("group_manager"))?;
let prefix = format!("v1/broadcast-channel/group/{group_id}/sender/{my_device_pubkey_hex}/");
let keys = store.list(&prefix)?;
for key in keys {
let _ = store.del(&key);
}
Ok(())
}
fn queue_pairwise_session_events_for_recipient(
recipient_owner: nostr::PublicKey,
recipient_owner_hex: &str,
rumor: &nostr::UnsignedEvent,
queue: &mut PairwiseSessionEventQueue<'_>,
) -> Result<()> {
const MAX_DELIVERY_ATTEMPTS: usize = 20;
const DELIVERY_RETRY_MS: u64 = 100;
let mut message_events: Vec<nostr::Event> = Vec::new();
for attempt in 0..MAX_DELIVERY_ATTEMPTS {
import_chats_into_session_manager(
queue.storage,
queue.session_manager,
queue.my_owner_pubkey_hex,
)?;
let event_ids = queue
.session_manager
.send_event_recipient_only(recipient_owner, rumor.clone())
.unwrap_or_default();
let drained = drain_session_manager_message_events(queue.session_manager_rx);
sync_member_chats_from_session_manager(
queue.storage,
queue.session_manager,
recipient_owner_hex,
)?;
if !drained.is_empty() && !event_ids.is_empty() {
message_events = drained;
break;
}
if attempt + 1 < MAX_DELIVERY_ATTEMPTS {
std::thread::sleep(std::time::Duration::from_millis(DELIVERY_RETRY_MS));
}
}
if !message_events.is_empty() {
queue.queued_events.extend(message_events);
return Ok(());
}
let member_chats: Vec<_> = queue
.storage
.list_chats()?
.into_iter()
.filter(|c| c.their_pubkey == recipient_owner_hex)
.collect();
for chat in member_chats {
let session_state: nostr_double_ratchet::SessionState =
match serde_json::from_str(&chat.session_state) {
Ok(state) => state,
Err(_) => continue,
};
let mut session = Session::new(session_state, chat.id.clone());
let encrypted = match session.send_event(rumor.clone()) {
Ok(event) => event,
Err(_) => continue,
};
let mut updated_chat = chat.clone();
updated_chat.session_state = serde_json::to_string(&session.state)?;
queue.storage.save_chat(&updated_chat)?;
queue.queued_events.push(encrypted);
break;
}
Ok(())
}
pub async fn create(
name: &str,
members: &[String],
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
let my_owner_pubkey = config.owner_public_key_hex()?;
let member_refs: Vec<&str> = members.iter().map(|s| s.as_str()).collect();
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
let now_ms = now.as_millis() as u64;
let client = Client::default();
let relays = config.resolved_relays();
for relay in &relays {
client.add_relay(relay).await?;
}
client.connect().await;
let mut group_manager = build_group_manager(config, storage)?;
let (session_manager, session_manager_rx) = build_session_manager(config, storage)?;
let mut pairwise_events: Vec<nostr::Event> = Vec::new();
let mut pairwise_queue = PairwiseSessionEventQueue {
my_owner_pubkey_hex: &my_owner_pubkey,
storage,
session_manager: &session_manager,
session_manager_rx: &session_manager_rx,
queued_events: &mut pairwise_events,
};
let mut send_pairwise = |recipient_owner: nostr::PublicKey,
rumor: &nostr::UnsignedEvent|
-> nostr_double_ratchet::Result<()> {
let recipient_owner_hex = recipient_owner.to_hex();
let _ = queue_pairwise_session_events_for_recipient(
recipient_owner,
&recipient_owner_hex,
rumor,
&mut pairwise_queue,
);
Ok(())
};
let created = group_manager
.create_group(
name,
&member_refs,
CreateGroupOptions {
send_pairwise: Some(&mut send_pairwise),
fanout_metadata: true,
now_ms: Some(now_ms),
},
)
.map_err(|e| anyhow::anyhow!("Failed to create group: {}", e))?;
const MAX_DELIVERY_ATTEMPTS: usize = 20;
const DELIVERY_RETRY_MS: u64 = 100;
let _ = publish_events_with_retry(
&client,
&pairwise_events,
MAX_DELIVERY_ATTEMPTS,
DELIVERY_RETRY_MS,
)
.await;
let stored = StoredGroup {
data: created.group,
};
storage.save_group(&stored)?;
if let Some(ref rumor) = created.metadata_rumor {
if let Some(stamp) = extract_control_stamp_from_unsigned(rumor) {
storage.save_group_control_stamp(&stored.data.id, &stamp)?;
let _ = storage.delete_group_tombstone(&stored.data.id)?;
}
}
output.success("group.create", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn list(storage: &Storage, output: &Output) -> Result<()> {
let groups = storage.list_groups()?;
let infos: Vec<GroupInfo> = groups.iter().map(|g| GroupInfo::from(&g.data)).collect();
output.success("group.list", GroupList { groups: infos });
Ok(())
}
pub async fn show(id: &str, storage: &Storage, output: &Output) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
output.success("group.show", GroupInfo::from(&group.data));
Ok(())
}
pub async fn delete(id: &str, storage: &Storage, output: &Output) -> Result<()> {
if storage.delete_group(id)? {
output.success_message("group.delete", &format!("Deleted group {}", id));
} else {
anyhow::bail!("Group not found: {}", id);
}
Ok(())
}
pub async fn update(
id: &str,
name: Option<&str>,
description: Option<&str>,
picture: Option<&str>,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
let my_pubkey = config.owner_public_key_hex()?;
let updates = nostr_double_ratchet::group::GroupUpdate {
name: name.map(|s| s.to_string()),
description: description.map(|s| s.to_string()),
picture: picture.map(|s| s.to_string()),
};
let updated = nostr_double_ratchet::group::update_group_data(&group.data, &updates, &my_pubkey)
.ok_or_else(|| anyhow::anyhow!("Permission denied: not an admin"))?;
let stored = StoredGroup { data: updated };
storage.save_group(&stored)?;
if let Ok(stamp) = fan_out_metadata(&stored.data, None, config, storage).await {
storage.save_group_control_stamp(&stored.data.id, &stamp)?;
let _ = storage.delete_group_tombstone(&stored.data.id)?;
}
output.success("group.update", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn add_member(
id: &str,
pubkey: &str,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
let my_pubkey = config.owner_public_key_hex()?;
let old_secret = group.data.secret.clone();
let updated = nostr_double_ratchet::group::add_group_member(&group.data, pubkey, &my_pubkey)
.ok_or_else(|| anyhow::anyhow!("Cannot add member: not admin or already a member"))?;
let secret_rotated = updated.secret != old_secret;
let stored = StoredGroup { data: updated };
storage.save_group(&stored)?;
if secret_rotated {
let my_device_pubkey = config.public_key()?;
let _ = storage.delete_group_sender_keys(id, &my_device_pubkey)?;
let _ = invalidate_group_manager_sender_state(id, &my_device_pubkey, storage);
}
if let Ok(stamp) = fan_out_metadata(&stored.data, None, config, storage).await {
storage.save_group_control_stamp(&stored.data.id, &stamp)?;
let _ = storage.delete_group_tombstone(&stored.data.id)?;
}
output.success("group.add-member", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn remove_member(
id: &str,
pubkey: &str,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
let my_pubkey = config.owner_public_key_hex()?;
let old_secret = group.data.secret.clone();
let updated = nostr_double_ratchet::group::remove_group_member(&group.data, pubkey, &my_pubkey)
.ok_or_else(|| {
anyhow::anyhow!(
"Cannot remove member: not admin, not a member, or trying to remove self"
)
})?;
let secret_rotated = updated.secret != old_secret;
let stored = StoredGroup { data: updated };
storage.save_group(&stored)?;
if secret_rotated {
let my_device_pubkey = config.public_key()?;
let _ = storage.delete_group_sender_keys(id, &my_device_pubkey)?;
let _ = invalidate_group_manager_sender_state(id, &my_device_pubkey, storage);
}
if let Ok(stamp) = fan_out_metadata(&stored.data, Some(pubkey), config, storage).await {
storage.save_group_control_stamp(&stored.data.id, &stamp)?;
let _ = storage.delete_group_tombstone(&stored.data.id)?;
}
output.success("group.remove-member", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn add_admin(
id: &str,
pubkey: &str,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
let my_pubkey = config.owner_public_key_hex()?;
let updated = nostr_double_ratchet::group::add_group_admin(&group.data, pubkey, &my_pubkey)
.ok_or_else(|| {
anyhow::anyhow!("Cannot add admin: not admin, not a member, or already an admin")
})?;
let stored = StoredGroup { data: updated };
storage.save_group(&stored)?;
if let Ok(stamp) = fan_out_metadata(&stored.data, None, config, storage).await {
storage.save_group_control_stamp(&stored.data.id, &stamp)?;
let _ = storage.delete_group_tombstone(&stored.data.id)?;
}
output.success("group.add-admin", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn remove_admin(
id: &str,
pubkey: &str,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
let my_pubkey = config.owner_public_key_hex()?;
let updated = nostr_double_ratchet::group::remove_group_admin(&group.data, pubkey, &my_pubkey)
.ok_or_else(|| {
anyhow::anyhow!(
"Cannot remove admin: not admin, target not admin, or would remove last admin"
)
})?;
let stored = StoredGroup { data: updated };
storage.save_group(&stored)?;
if let Ok(stamp) = fan_out_metadata(&stored.data, None, config, storage).await {
storage.save_group_control_stamp(&stored.data.id, &stamp)?;
let _ = storage.delete_group_tombstone(&stored.data.id)?;
}
output.success("group.remove-admin", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn send_message(
id: &str,
message: &str,
reply_to: Option<&str>,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
if !config.is_logged_in() {
anyhow::bail!("Not logged in. Use 'ndr login <key>' first.");
}
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
if group.data.accepted != Some(true) {
anyhow::bail!("Group not accepted. Run: ndr group accept {}", id);
}
let my_owner_pubkey = config.owner_public_key_hex()?;
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
let now_s = now.as_secs();
let now_ms = now.as_millis() as u64;
let client = Client::default();
let relays = config.resolved_relays();
for relay in &relays {
client.add_relay(relay).await?;
}
client.connect().await;
let mut group_manager = build_group_manager(config, storage)?;
group_manager
.upsert_group(group.data.clone())
.map_err(|e| anyhow::anyhow!("Failed to initialize group manager: {}", e))?;
let (session_manager, session_manager_rx) = build_session_manager(config, storage)?;
let mut pairwise_events: Vec<nostr::Event> = Vec::new();
let mut pairwise_queue = PairwiseSessionEventQueue {
my_owner_pubkey_hex: &my_owner_pubkey,
storage,
session_manager: &session_manager,
session_manager_rx: &session_manager_rx,
queued_events: &mut pairwise_events,
};
let mut send_pairwise = |recipient_owner: nostr::PublicKey,
rumor: &nostr::UnsignedEvent|
-> nostr_double_ratchet::Result<()> {
let recipient_owner_hex = recipient_owner.to_hex();
let _ = queue_pairwise_session_events_for_recipient(
recipient_owner,
&recipient_owner_hex,
rumor,
&mut pairwise_queue,
);
Ok(())
};
let mut publish_outer = |_outer: &nostr::Event| -> nostr_double_ratchet::Result<()> { Ok(()) };
let mut tags: Vec<Vec<String>> = Vec::new();
if let Some(reply_id) = reply_to {
tags.push(vec!["e".to_string(), reply_id.to_string()]);
}
let sent = group_manager
.send_event(
id,
GroupSendEvent {
kind: CHAT_MESSAGE_KIND,
content: message.to_string(),
tags,
},
&mut send_pairwise,
&mut publish_outer,
Some(now_ms),
)
.map_err(|e| anyhow::anyhow!("Failed to send group message: {}", e))?;
const MAX_DELIVERY_ATTEMPTS: usize = 20;
const DELIVERY_RETRY_MS: u64 = 100;
let _ = publish_events_with_retry(
&client,
&pairwise_events,
MAX_DELIVERY_ATTEMPTS,
DELIVERY_RETRY_MS,
)
.await;
send_event_or_ignore(&client, sent.outer.clone()).await?;
let msg_id = sent.outer.id.to_hex();
let stored_msg = StoredGroupMessage {
id: msg_id.clone(),
group_id: id.to_string(),
sender_pubkey: my_owner_pubkey.clone(),
content: message.to_string(),
timestamp: now_s,
is_outgoing: true,
expires_at: None,
};
storage.save_group_message(&stored_msg)?;
output.success(
"group.send",
serde_json::json!({
"id": msg_id,
"group_id": id,
"content": message,
"timestamp": now_s,
"published": true,
}),
);
Ok(())
}
pub async fn react(
id: &str,
message_id: &str,
emoji: &str,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
if !config.is_logged_in() {
anyhow::bail!("Not logged in. Use 'ndr login <key>' first.");
}
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
if group.data.accepted != Some(true) {
anyhow::bail!("Group not accepted. Run: ndr group accept {}", id);
}
let my_owner_pubkey = config.owner_public_key_hex()?;
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
let now_ms = now.as_millis() as u64;
let client = Client::default();
let relays = config.resolved_relays();
for relay in &relays {
client.add_relay(relay).await?;
}
client.connect().await;
let mut group_manager = build_group_manager(config, storage)?;
group_manager
.upsert_group(group.data.clone())
.map_err(|e| anyhow::anyhow!("Failed to initialize group manager: {}", e))?;
let (session_manager, session_manager_rx) = build_session_manager(config, storage)?;
let mut pairwise_events: Vec<nostr::Event> = Vec::new();
let mut pairwise_queue = PairwiseSessionEventQueue {
my_owner_pubkey_hex: &my_owner_pubkey,
storage,
session_manager: &session_manager,
session_manager_rx: &session_manager_rx,
queued_events: &mut pairwise_events,
};
let mut send_pairwise = |recipient_owner: nostr::PublicKey,
rumor: &nostr::UnsignedEvent|
-> nostr_double_ratchet::Result<()> {
let recipient_owner_hex = recipient_owner.to_hex();
let _ = queue_pairwise_session_events_for_recipient(
recipient_owner,
&recipient_owner_hex,
rumor,
&mut pairwise_queue,
);
Ok(())
};
let mut publish_outer = |_outer: &nostr::Event| -> nostr_double_ratchet::Result<()> { Ok(()) };
let sent = group_manager
.send_event(
id,
GroupSendEvent {
kind: REACTION_KIND,
content: emoji.to_string(),
tags: vec![vec!["e".to_string(), message_id.to_string()]],
},
&mut send_pairwise,
&mut publish_outer,
Some(now_ms),
)
.map_err(|e| anyhow::anyhow!("Failed to send group reaction: {}", e))?;
const MAX_DELIVERY_ATTEMPTS: usize = 20;
const DELIVERY_RETRY_MS: u64 = 100;
let _ = publish_events_with_retry(
&client,
&pairwise_events,
MAX_DELIVERY_ATTEMPTS,
DELIVERY_RETRY_MS,
)
.await;
send_event_or_ignore(&client, sent.outer).await?;
output.success(
"group.react",
serde_json::json!({
"group_id": id,
"message_id": message_id,
"emoji": emoji,
"published": true,
}),
);
Ok(())
}
pub async fn rotate_sender_key(
id: &str,
config: &Config,
storage: &Storage,
output: &Output,
) -> Result<()> {
if !config.is_logged_in() {
anyhow::bail!("Not logged in. Use 'ndr login <key>' first.");
}
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
if group.data.accepted != Some(true) {
anyhow::bail!("Group not accepted. Run: ndr group accept {}", id);
}
let my_owner_pubkey = config.owner_public_key_hex()?;
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
let now_ms = now.as_millis() as u64;
let client = Client::default();
let relays = config.resolved_relays();
for relay in &relays {
client.add_relay(relay).await?;
}
client.connect().await;
let mut group_manager = build_group_manager(config, storage)?;
group_manager
.upsert_group(group.data.clone())
.map_err(|e| anyhow::anyhow!("Failed to initialize group manager: {}", e))?;
let (session_manager, session_manager_rx) = build_session_manager(config, storage)?;
let mut pairwise_events: Vec<nostr::Event> = Vec::new();
let mut pairwise_queue = PairwiseSessionEventQueue {
my_owner_pubkey_hex: &my_owner_pubkey,
storage,
session_manager: &session_manager,
session_manager_rx: &session_manager_rx,
queued_events: &mut pairwise_events,
};
let mut send_pairwise = |recipient_owner: nostr::PublicKey,
rumor: &nostr::UnsignedEvent|
-> nostr_double_ratchet::Result<()> {
let recipient_owner_hex = recipient_owner.to_hex();
let _ = queue_pairwise_session_events_for_recipient(
recipient_owner,
&recipient_owner_hex,
rumor,
&mut pairwise_queue,
);
Ok(())
};
let dist = group_manager
.rotate_sender_key(id, &mut send_pairwise, Some(now_ms))
.map_err(|e| anyhow::anyhow!("Failed to rotate group sender key: {}", e))?;
const MAX_DELIVERY_ATTEMPTS: usize = 20;
const DELIVERY_RETRY_MS: u64 = 100;
let _ = publish_events_with_retry(
&client,
&pairwise_events,
MAX_DELIVERY_ATTEMPTS,
DELIVERY_RETRY_MS,
)
.await;
output.success(
"group.rotate-sender-key",
serde_json::json!({
"group_id": id,
"key_id": dist.key_id,
"published": true,
}),
);
Ok(())
}
pub async fn accept(id: &str, config: &Config, storage: &Storage, output: &Output) -> Result<()> {
let group = storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
if group.data.accepted == Some(true) {
anyhow::bail!("Group already accepted");
}
let mut updated = group.data.clone();
updated.accepted = Some(true);
let stored = StoredGroup { data: updated };
storage.save_group(&stored)?;
if let Some(ref secret_hex) = stored.data.secret {
if let Ok(secret_bytes) = hex::decode(secret_hex) {
if secret_bytes.len() == 32 {
let mut arr = [0u8; 32];
arr.copy_from_slice(&secret_bytes);
if let Ok(channel) = nostr_double_ratchet::SharedChannel::new(&arr) {
let my_device_pubkey = config.public_key()?;
let my_owner_pubkey_hex = config.owner_public_key_hex()?;
let my_owner_pubkey = nostr::PublicKey::from_hex(&my_owner_pubkey_hex)?;
let my_device_private_key = config.private_key_bytes()?;
let my_device_secret_key =
nostr::SecretKey::from_slice(&my_device_private_key)?;
let my_device_keys = nostr::Keys::new(my_device_secret_key);
let my_device_pk = nostr::PublicKey::from_hex(&my_device_pubkey)?;
let mut invite =
nostr_double_ratchet::Invite::create_new(my_device_pk, None, None)?;
invite.owner_public_key = Some(my_owner_pubkey);
let invite_url = invite.get_url("https://chat.iris.to")?;
let inner_content = serde_json::json!({
"inviteUrl": invite_url,
"groupId": id,
"ownerPubkey": my_owner_pubkey_hex,
})
.to_string();
let inner_unsigned = nostr::EventBuilder::new(
nostr::Kind::Custom(nostr_double_ratchet::GROUP_INVITE_RUMOR_KIND as u16),
inner_content,
)
.tag(
nostr::Tag::parse(&["l".to_string(), id.to_string()])
.map_err(|e| anyhow::anyhow!("Invalid group tag: {}", e))?,
)
.build(my_device_pk);
let inner_signed = inner_unsigned.sign_with_keys(&my_device_keys)?;
if let Ok(event) =
channel.create_event(&nostr::JsonUtil::as_json(&inner_signed))
{
let client = Client::default();
let relays = config.resolved_relays();
for relay in &relays {
client.add_relay(relay).await?;
}
client.connect().await;
let _ = client.send_event(event).await;
}
let stored_invite = crate::storage::StoredInvite {
id: uuid::Uuid::new_v4().to_string(),
label: Some(format!("group:{}", id)),
url: invite_url,
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs(),
serialized: invite.serialize()?,
};
storage.save_invite(&stored_invite)?;
}
}
}
}
output.success("group.accept", GroupInfo::from(&stored.data));
Ok(())
}
pub async fn messages(id: &str, limit: usize, storage: &Storage, output: &Output) -> Result<()> {
storage
.get_group(id)?
.ok_or_else(|| anyhow::anyhow!("Group not found: {}", id))?;
let now_seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let _ = storage.purge_expired_group_messages(id, now_seconds);
let msgs = storage.get_group_messages(id, limit)?;
let message_infos: Vec<GroupMessageInfo> = msgs
.into_iter()
.map(|m| GroupMessageInfo {
id: m.id,
group_id: m.group_id,
sender_pubkey: m.sender_pubkey,
content: m.content,
timestamp: m.timestamp,
is_outgoing: m.is_outgoing,
})
.collect();
output.success(
"group.messages",
GroupMessageList {
group_id: id.to_string(),
messages: message_infos,
},
);
Ok(())
}