1use std::io::{self, Read};
2
3use nostr_sdk::prelude::*;
4use serde::Serialize;
5
6use crate::client::{self, RelayResult};
7use crate::config::load_config;
8use crate::error::{AmError, AmResult};
9use crate::identity::load_keys;
10
11#[derive(Debug, Serialize)]
12pub struct SentMessage {
13 pub to: Vec<String>,
14 pub content: String,
15 #[serde(skip_serializing_if = "Vec::is_empty")]
16 pub failed: Vec<String>,
17 #[serde(skip_serializing_if = "Vec::is_empty")]
18 pub relays: Vec<RelayResult>,
19}
20
21#[derive(Debug, Serialize)]
22pub struct ReceivedMessage {
23 pub from: String,
24 pub content: String,
25 pub timestamp: u64,
26 pub participants: Vec<String>,
27}
28
29fn extract_participants(unwrapped: &UnwrappedGift, our_pk: &PublicKey) -> Vec<String> {
35 let mut participants: Vec<String> = unwrapped
36 .rumor
37 .tags
38 .public_keys()
39 .filter_map(|pk| pk.to_bech32().ok())
40 .collect();
41
42 let sender_npub = unwrapped.sender.to_bech32().unwrap_or_default();
44 if !participants.contains(&sender_npub) {
45 participants.push(sender_npub);
46 }
47
48 let our_npub = our_pk.to_bech32().unwrap_or_default();
50 if !participants.contains(&our_npub) {
51 participants.push(our_npub);
52 }
53
54 participants.sort();
55 participants.dedup();
56 participants
57}
58
59pub async fn send(
60 identity: Option<&str>,
61 to_npubs: &[String],
62 message: Option<&str>,
63 passphrase: Option<&str>,
64 verbosity: u8,
65) -> AmResult<SentMessage> {
66 let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
67 let config = load_config()?;
68
69 if config.relays.is_empty() {
70 return Err(AmError::Network(
71 "no relays configured; run `am relay add <url>`".into(),
72 ));
73 }
74
75 if to_npubs.is_empty() {
76 return Err(AmError::Args(
77 "at least one --to recipient is required".into(),
78 ));
79 }
80
81 let content = match message {
82 Some(m) => m.to_string(),
83 None => {
84 let mut buf = String::new();
85 io::stdin().read_to_string(&mut buf)?;
86 buf.trim_end().to_string()
87 }
88 };
89
90 if content.is_empty() {
91 return Err(AmError::Args("message is empty".into()));
92 }
93
94 let mut recipients = Vec::new();
95 for npub in to_npubs {
96 let pk = PublicKey::from_bech32(npub).map_err(|e| AmError::Crypto(e.to_string()))?;
97 recipients.push((npub.clone(), pk));
98 }
99
100 let client = client::connect(keys, &config.relays).await?;
101
102 let signer = client
104 .signer()
105 .await
106 .map_err(|e| AmError::Crypto(e.to_string()))?;
107
108 let our_pk = signer
109 .get_public_key()
110 .await
111 .map_err(|e| AmError::Crypto(e.to_string()))?;
112
113 let mut wrapped_events = Vec::new();
114 for (npub, pk) in &recipients {
115 let extra_tags: Vec<Tag> = if recipients.len() > 1 {
124 let mut tags: Vec<Tag> = recipients
125 .iter()
126 .filter(|(_, other_pk)| other_pk != pk)
127 .map(|(_, other_pk)| Tag::public_key(*other_pk))
128 .collect();
129 tags.push(Tag::public_key(our_pk));
130 tags
131 } else {
132 vec![]
133 };
134 let rumor = EventBuilder::private_msg_rumor(*pk, &content)
135 .tags(extra_tags)
136 .allow_self_tagging()
137 .build(our_pk);
138 let event = EventBuilder::gift_wrap(&signer, pk, rumor, [])
139 .await
140 .map_err(|e| AmError::Crypto(e.to_string()))?;
141 wrapped_events.push((npub.clone(), event));
142 }
143
144 let mut handles = Vec::new();
146 for (npub, event) in wrapped_events {
147 let client = client.clone();
148 let relays = config.relays.clone();
149 handles.push(tokio::spawn(async move {
150 let (relay_results, succeeded) =
151 client::send_with_retry(&client, &event, &relays, 3, verbosity).await;
152 (npub, relay_results, !succeeded.is_empty())
153 }));
154 }
155
156 let join_results = futures::future::join_all(handles).await;
157
158 let mut sent = Vec::new();
159 let mut failed = Vec::new();
160 let mut all_relay_results = Vec::new();
161
162 for result in join_results {
163 match result {
164 Ok((npub, relay_results, any_success)) => {
165 if any_success {
166 sent.push(npub);
167 } else {
168 failed.push(npub);
169 }
170 for rr in relay_results {
172 if let Some(existing) = all_relay_results
173 .iter_mut()
174 .find(|r: &&mut RelayResult| r.relay == rr.relay)
175 {
176 if matches!(rr.status, client::RelayStatus::Failed) {
178 existing.status = client::RelayStatus::Failed;
179 existing.error = rr.error;
180 }
181 } else {
182 all_relay_results.push(rr);
183 }
184 }
185 }
186 Err(e) => {
187 tracing::warn!("send task panicked: {e}");
188 }
189 }
190 }
191
192 client.disconnect().await;
193
194 if sent.is_empty() {
195 return Err(AmError::Network("failed to send to all recipients".into()));
196 }
197
198 Ok(SentMessage {
199 to: sent,
200 content,
201 failed,
202 relays: all_relay_results,
203 })
204}
205
206pub async fn listen(
207 identity: Option<&str>,
208 since: Option<u64>,
209 limit: Option<usize>,
210 once: bool,
211 timeout_secs: u64,
212 passphrase: Option<&str>,
213 _verbosity: u8,
214) -> AmResult<Vec<ReceivedMessage>> {
215 let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
216 let config = load_config()?;
217
218 if config.relays.is_empty() {
219 return Err(AmError::Network(
220 "no relays configured; run `am relay add <url>`".into(),
221 ));
222 }
223
224 let client = client::connect(keys, &config.relays).await?;
225
226 let our_pk = client
227 .public_key()
228 .await
229 .map_err(|e| AmError::Crypto(e.to_string()))?;
230
231 let filter = {
232 let mut f = Filter::new().kind(Kind::GiftWrap).pubkey(our_pk);
233 if let Some(ts) = since {
234 f = f.since(Timestamp::from(ts));
235 }
236 if let Some(l) = limit {
237 f = f.limit(l);
238 }
239 f
240 };
241
242 if once {
243 let events = client
244 .fetch_events(filter, std::time::Duration::from_secs(timeout_secs))
245 .await
246 .map_err(|e| AmError::Network(e.to_string()))?;
247
248 let mut messages = Vec::new();
249 for event in events.into_iter() {
250 if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
251 let participants = extract_participants(&unwrapped, &our_pk);
252 messages.push(ReceivedMessage {
253 from: unwrapped.sender.to_bech32().unwrap_or_default(),
254 content: unwrapped.rumor.content.clone(),
255 timestamp: unwrapped.rumor.created_at.as_secs(),
256 participants,
257 });
258 }
259 }
260 messages.sort_by_key(|m| m.timestamp);
261 client.disconnect().await;
262 Ok(messages)
263 } else {
264 client
265 .subscribe(filter, None)
266 .await
267 .map_err(|e| AmError::Network(e.to_string()))?;
268
269 client
270 .handle_notifications(|notification| async {
271 if let RelayPoolNotification::Event { event, .. } = notification {
272 if event.kind == Kind::GiftWrap {
273 if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
274 let participants = extract_participants(&unwrapped, &our_pk);
275 let msg = ReceivedMessage {
276 from: unwrapped.sender.to_bech32().unwrap_or_default(),
277 content: unwrapped.rumor.content.clone(),
278 timestamp: unwrapped.rumor.created_at.as_secs(),
279 participants,
280 };
281 if let Ok(json) = serde_json::to_string(&msg) {
282 println!("{json}");
283 }
284 }
285 }
286 }
287 Ok(false)
288 })
289 .await
290 .map_err(|e| AmError::Network(e.to_string()))?;
291
292 Ok(vec![])
293 }
294}