Skip to main content

am_core/
message.rs

1use std::io::{self, Read};
2
3use nostr_sdk::prelude::*;
4use serde::Serialize;
5
6use crate::client::{self, RelayResult};
7use crate::config::load_config;
8use crate::error::{AmError, AmResult};
9use crate::identity::load_keys;
10
11#[derive(Debug, Serialize)]
12pub struct SentMessage {
13    pub to: Vec<String>,
14    pub content: String,
15    #[serde(skip_serializing_if = "Vec::is_empty")]
16    pub failed: Vec<String>,
17    #[serde(skip_serializing_if = "Vec::is_empty")]
18    pub relays: Vec<RelayResult>,
19}
20
21#[derive(Debug, Serialize)]
22pub struct ReceivedMessage {
23    pub from: String,
24    pub content: String,
25    pub timestamp: u64,
26}
27
28pub async fn send(
29    identity: Option<&str>,
30    to_npubs: &[String],
31    message: Option<&str>,
32    passphrase: Option<&str>,
33    verbosity: u8,
34) -> AmResult<SentMessage> {
35    let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
36    let config = load_config()?;
37
38    if config.relays.is_empty() {
39        return Err(AmError::Network(
40            "no relays configured; run `am relay add <url>`".into(),
41        ));
42    }
43
44    if to_npubs.is_empty() {
45        return Err(AmError::Args(
46            "at least one --to recipient is required".into(),
47        ));
48    }
49
50    let content = match message {
51        Some(m) => m.to_string(),
52        None => {
53            let mut buf = String::new();
54            io::stdin().read_to_string(&mut buf)?;
55            buf.trim_end().to_string()
56        }
57    };
58
59    if content.is_empty() {
60        return Err(AmError::Args("message is empty".into()));
61    }
62
63    let mut recipients = Vec::new();
64    for npub in to_npubs {
65        let pk = PublicKey::from_bech32(npub).map_err(|e| AmError::Crypto(e.to_string()))?;
66        recipients.push((npub.clone(), pk));
67    }
68
69    let client = client::connect(keys, &config.relays).await?;
70
71    // Build gift-wrap events (sequential — each needs unique encryption)
72    let signer = client
73        .signer()
74        .await
75        .map_err(|e| AmError::Crypto(e.to_string()))?;
76
77    let our_pk = signer
78        .get_public_key()
79        .await
80        .map_err(|e| AmError::Crypto(e.to_string()))?;
81
82    let mut wrapped_events = Vec::new();
83    for (npub, pk) in &recipients {
84        // NIP-17: the p-tag set defines the chat room. Clients like 0xchat
85        // match rooms by p-tags, so we must include ALL participants —
86        // every other recipient plus the sender — so the room identity
87        // is consistent across all participants.
88        //
89        // We build the rumor manually instead of using EventBuilder::private_msg
90        // because the builder strips p-tags matching the author by default.
91        // Group chats require the sender's own p-tag for room identity.
92        let extra_tags: Vec<Tag> = if recipients.len() > 1 {
93            let mut tags: Vec<Tag> = recipients
94                .iter()
95                .filter(|(_, other_pk)| other_pk != pk)
96                .map(|(_, other_pk)| Tag::public_key(*other_pk))
97                .collect();
98            tags.push(Tag::public_key(our_pk));
99            tags
100        } else {
101            vec![]
102        };
103        let rumor = EventBuilder::private_msg_rumor(*pk, &content)
104            .tags(extra_tags)
105            .allow_self_tagging()
106            .build(our_pk);
107        let event = EventBuilder::gift_wrap(&signer, pk, rumor, [])
108            .await
109            .map_err(|e| AmError::Crypto(e.to_string()))?;
110        wrapped_events.push((npub.clone(), event));
111    }
112
113    // Send all events concurrently with retry
114    let mut handles = Vec::new();
115    for (npub, event) in wrapped_events {
116        let client = client.clone();
117        let relays = config.relays.clone();
118        handles.push(tokio::spawn(async move {
119            let (relay_results, succeeded) =
120                client::send_with_retry(&client, &event, &relays, 3, verbosity).await;
121            (npub, relay_results, !succeeded.is_empty())
122        }));
123    }
124
125    let join_results = futures::future::join_all(handles).await;
126
127    let mut sent = Vec::new();
128    let mut failed = Vec::new();
129    let mut all_relay_results = Vec::new();
130
131    for result in join_results {
132        match result {
133            Ok((npub, relay_results, any_success)) => {
134                if any_success {
135                    sent.push(npub);
136                } else {
137                    failed.push(npub);
138                }
139                // Merge relay results (dedup by relay URL, keep worst status)
140                for rr in relay_results {
141                    if let Some(existing) = all_relay_results
142                        .iter_mut()
143                        .find(|r: &&mut RelayResult| r.relay == rr.relay)
144                    {
145                        // If any send to this relay failed, mark it failed
146                        if matches!(rr.status, client::RelayStatus::Failed) {
147                            existing.status = client::RelayStatus::Failed;
148                            existing.error = rr.error;
149                        }
150                    } else {
151                        all_relay_results.push(rr);
152                    }
153                }
154            }
155            Err(e) => {
156                tracing::warn!("send task panicked: {e}");
157            }
158        }
159    }
160
161    client.disconnect().await;
162
163    if sent.is_empty() {
164        return Err(AmError::Network("failed to send to all recipients".into()));
165    }
166
167    Ok(SentMessage {
168        to: sent,
169        content,
170        failed,
171        relays: all_relay_results,
172    })
173}
174
175pub async fn listen(
176    identity: Option<&str>,
177    since: Option<u64>,
178    limit: Option<usize>,
179    once: bool,
180    timeout_secs: u64,
181    passphrase: Option<&str>,
182    _verbosity: u8,
183) -> AmResult<Vec<ReceivedMessage>> {
184    let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
185    let config = load_config()?;
186
187    if config.relays.is_empty() {
188        return Err(AmError::Network(
189            "no relays configured; run `am relay add <url>`".into(),
190        ));
191    }
192
193    let client = client::connect(keys, &config.relays).await?;
194
195    let filter = {
196        let our_pk = client
197            .public_key()
198            .await
199            .map_err(|e| AmError::Crypto(e.to_string()))?;
200        let mut f = Filter::new().kind(Kind::GiftWrap).pubkey(our_pk);
201        if let Some(ts) = since {
202            f = f.since(Timestamp::from(ts));
203        }
204        if let Some(l) = limit {
205            f = f.limit(l);
206        }
207        f
208    };
209
210    if once {
211        let events = client
212            .fetch_events(filter, std::time::Duration::from_secs(timeout_secs))
213            .await
214            .map_err(|e| AmError::Network(e.to_string()))?;
215
216        let mut messages = Vec::new();
217        for event in events.into_iter() {
218            if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
219                messages.push(ReceivedMessage {
220                    from: unwrapped.sender.to_bech32().unwrap_or_default(),
221                    content: unwrapped.rumor.content.clone(),
222                    timestamp: unwrapped.rumor.created_at.as_secs(),
223                });
224            }
225        }
226        messages.sort_by_key(|m| m.timestamp);
227        client.disconnect().await;
228        Ok(messages)
229    } else {
230        client
231            .subscribe(filter, None)
232            .await
233            .map_err(|e| AmError::Network(e.to_string()))?;
234
235        client
236            .handle_notifications(|notification| async {
237                if let RelayPoolNotification::Event { event, .. } = notification {
238                    if event.kind == Kind::GiftWrap {
239                        if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
240                            let msg = ReceivedMessage {
241                                from: unwrapped.sender.to_bech32().unwrap_or_default(),
242                                content: unwrapped.rumor.content.clone(),
243                                timestamp: unwrapped.rumor.created_at.as_secs(),
244                            };
245                            if let Ok(json) = serde_json::to_string(&msg) {
246                                println!("{json}");
247                            }
248                        }
249                    }
250                }
251                Ok(false)
252            })
253            .await
254            .map_err(|e| AmError::Network(e.to_string()))?;
255
256        Ok(vec![])
257    }
258}