Skip to main content

am_core/
message.rs

1use std::io::{self, Read};
2
3use nostr_sdk::prelude::*;
4use serde::Serialize;
5
6use crate::client::{self, RelayResult};
7use crate::config::load_config;
8use crate::error::{AmError, AmResult};
9use crate::identity::load_keys;
10
11#[derive(Debug, Serialize)]
12pub struct SentMessage {
13    pub to: Vec<String>,
14    pub content: String,
15    #[serde(skip_serializing_if = "Vec::is_empty")]
16    pub failed: Vec<String>,
17    #[serde(skip_serializing_if = "Vec::is_empty")]
18    pub relays: Vec<RelayResult>,
19}
20
21#[derive(Debug, Serialize)]
22pub struct ReceivedMessage {
23    pub from: String,
24    pub content: String,
25    pub timestamp: u64,
26    pub participants: Vec<String>,
27}
28
29/// Extract sorted, deduplicated participant npubs from an unwrapped gift wrap.
30///
31/// For group chats, the rumor contains p-tags for all participants.
32/// For 1:1 DMs, the rumor has only one p-tag (the recipient), so we
33/// construct participants from the sender + our own pubkey.
34fn extract_participants(unwrapped: &UnwrappedGift, our_pk: &PublicKey) -> Vec<String> {
35    let mut participants: Vec<String> = unwrapped
36        .rumor
37        .tags
38        .public_keys()
39        .filter_map(|pk| pk.to_bech32().ok())
40        .collect();
41
42    // Always include the sender
43    let sender_npub = unwrapped.sender.to_bech32().unwrap_or_default();
44    if !participants.contains(&sender_npub) {
45        participants.push(sender_npub);
46    }
47
48    // Always include ourselves
49    let our_npub = our_pk.to_bech32().unwrap_or_default();
50    if !participants.contains(&our_npub) {
51        participants.push(our_npub);
52    }
53
54    participants.sort();
55    participants.dedup();
56    participants
57}
58
59pub async fn send(
60    identity: Option<&str>,
61    to_npubs: &[String],
62    message: Option<&str>,
63    passphrase: Option<&str>,
64    verbosity: u8,
65) -> AmResult<SentMessage> {
66    let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
67    let config = load_config()?;
68
69    if config.relays.is_empty() {
70        return Err(AmError::Network(
71            "no relays configured; run `am relay add <url>`".into(),
72        ));
73    }
74
75    if to_npubs.is_empty() {
76        return Err(AmError::Args(
77            "at least one --to recipient is required".into(),
78        ));
79    }
80
81    let content = match message {
82        Some(m) => m.to_string(),
83        None => {
84            let mut buf = String::new();
85            io::stdin().read_to_string(&mut buf)?;
86            buf.trim_end().to_string()
87        }
88    };
89
90    if content.is_empty() {
91        return Err(AmError::Args("message is empty".into()));
92    }
93
94    let mut recipients = Vec::new();
95    for npub in to_npubs {
96        let pk = PublicKey::from_bech32(npub).map_err(|e| AmError::Crypto(e.to_string()))?;
97        recipients.push((npub.clone(), pk));
98    }
99
100    let client = client::connect(keys, &config.relays).await?;
101
102    // Build gift-wrap events (sequential — each needs unique encryption)
103    let signer = client
104        .signer()
105        .await
106        .map_err(|e| AmError::Crypto(e.to_string()))?;
107
108    let our_pk = signer
109        .get_public_key()
110        .await
111        .map_err(|e| AmError::Crypto(e.to_string()))?;
112
113    let mut wrapped_events = Vec::new();
114    for (npub, pk) in &recipients {
115        // NIP-17: the p-tag set defines the chat room. Clients like 0xchat
116        // match rooms by p-tags, so we must include ALL participants —
117        // every other recipient plus the sender — so the room identity
118        // is consistent across all participants.
119        //
120        // We build the rumor manually instead of using EventBuilder::private_msg
121        // because the builder strips p-tags matching the author by default.
122        // Group chats require the sender's own p-tag for room identity.
123        let extra_tags: Vec<Tag> = if recipients.len() > 1 {
124            let mut tags: Vec<Tag> = recipients
125                .iter()
126                .filter(|(_, other_pk)| other_pk != pk)
127                .map(|(_, other_pk)| Tag::public_key(*other_pk))
128                .collect();
129            tags.push(Tag::public_key(our_pk));
130            tags
131        } else {
132            vec![]
133        };
134        let rumor = EventBuilder::private_msg_rumor(*pk, &content)
135            .tags(extra_tags)
136            .allow_self_tagging()
137            .build(our_pk);
138        let event = EventBuilder::gift_wrap(&signer, pk, rumor, [])
139            .await
140            .map_err(|e| AmError::Crypto(e.to_string()))?;
141        wrapped_events.push((npub.clone(), event));
142    }
143
144    // Send all events concurrently with retry
145    let mut handles = Vec::new();
146    for (npub, event) in wrapped_events {
147        let client = client.clone();
148        let relays = config.relays.clone();
149        handles.push(tokio::spawn(async move {
150            let (relay_results, succeeded) =
151                client::send_with_retry(&client, &event, &relays, 3, verbosity).await;
152            (npub, relay_results, !succeeded.is_empty())
153        }));
154    }
155
156    let join_results = futures::future::join_all(handles).await;
157
158    let mut sent = Vec::new();
159    let mut failed = Vec::new();
160    let mut all_relay_results = Vec::new();
161
162    for result in join_results {
163        match result {
164            Ok((npub, relay_results, any_success)) => {
165                if any_success {
166                    sent.push(npub);
167                } else {
168                    failed.push(npub);
169                }
170                // Merge relay results (dedup by relay URL, keep worst status)
171                for rr in relay_results {
172                    if let Some(existing) = all_relay_results
173                        .iter_mut()
174                        .find(|r: &&mut RelayResult| r.relay == rr.relay)
175                    {
176                        // If any send to this relay failed, mark it failed
177                        if matches!(rr.status, client::RelayStatus::Failed) {
178                            existing.status = client::RelayStatus::Failed;
179                            existing.error = rr.error;
180                        }
181                    } else {
182                        all_relay_results.push(rr);
183                    }
184                }
185            }
186            Err(e) => {
187                tracing::warn!("send task panicked: {e}");
188            }
189        }
190    }
191
192    client.disconnect().await;
193
194    if sent.is_empty() {
195        return Err(AmError::Network("failed to send to all recipients".into()));
196    }
197
198    Ok(SentMessage {
199        to: sent,
200        content,
201        failed,
202        relays: all_relay_results,
203    })
204}
205
206pub async fn listen(
207    identity: Option<&str>,
208    since: Option<u64>,
209    limit: Option<usize>,
210    once: bool,
211    timeout_secs: u64,
212    passphrase: Option<&str>,
213    _verbosity: u8,
214) -> AmResult<Vec<ReceivedMessage>> {
215    let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
216    let config = load_config()?;
217
218    if config.relays.is_empty() {
219        return Err(AmError::Network(
220            "no relays configured; run `am relay add <url>`".into(),
221        ));
222    }
223
224    let client = client::connect(keys, &config.relays).await?;
225
226    let our_pk = client
227        .public_key()
228        .await
229        .map_err(|e| AmError::Crypto(e.to_string()))?;
230
231    let filter = {
232        let mut f = Filter::new().kind(Kind::GiftWrap).pubkey(our_pk);
233        if let Some(ts) = since {
234            f = f.since(Timestamp::from(ts));
235        }
236        if let Some(l) = limit {
237            f = f.limit(l);
238        }
239        f
240    };
241
242    if once {
243        let events = client
244            .fetch_events(filter, std::time::Duration::from_secs(timeout_secs))
245            .await
246            .map_err(|e| AmError::Network(e.to_string()))?;
247
248        let mut messages = Vec::new();
249        for event in events.into_iter() {
250            if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
251                let participants = extract_participants(&unwrapped, &our_pk);
252                messages.push(ReceivedMessage {
253                    from: unwrapped.sender.to_bech32().unwrap_or_default(),
254                    content: unwrapped.rumor.content.clone(),
255                    timestamp: unwrapped.rumor.created_at.as_secs(),
256                    participants,
257                });
258            }
259        }
260        messages.sort_by_key(|m| m.timestamp);
261        client.disconnect().await;
262        Ok(messages)
263    } else {
264        client
265            .subscribe(filter, None)
266            .await
267            .map_err(|e| AmError::Network(e.to_string()))?;
268
269        client
270            .handle_notifications(|notification| async {
271                if let RelayPoolNotification::Event { event, .. } = notification {
272                    if event.kind == Kind::GiftWrap {
273                        if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
274                            let participants = extract_participants(&unwrapped, &our_pk);
275                            let msg = ReceivedMessage {
276                                from: unwrapped.sender.to_bech32().unwrap_or_default(),
277                                content: unwrapped.rumor.content.clone(),
278                                timestamp: unwrapped.rumor.created_at.as_secs(),
279                                participants,
280                            };
281                            if let Ok(json) = serde_json::to_string(&msg) {
282                                println!("{json}");
283                            }
284                        }
285                    }
286                }
287                Ok(false)
288            })
289            .await
290            .map_err(|e| AmError::Network(e.to_string()))?;
291
292        Ok(vec![])
293    }
294}