1use std::io::{self, Read};
2
3use nostr_sdk::prelude::*;
4use serde::Serialize;
5
6use crate::client::{self, RelayResult};
7use crate::config::load_config;
8use crate::error::{AmError, AmResult};
9use crate::identity::load_keys;
10
11#[derive(Debug, Serialize)]
12pub struct SentMessage {
13 pub to: Vec<String>,
14 pub content: String,
15 #[serde(skip_serializing_if = "Vec::is_empty")]
16 pub failed: Vec<String>,
17 #[serde(skip_serializing_if = "Vec::is_empty")]
18 pub relays: Vec<RelayResult>,
19}
20
21#[derive(Debug, Serialize)]
22pub struct ReceivedMessage {
23 pub from: String,
24 pub content: String,
25 pub timestamp: u64,
26}
27
28pub async fn send(
29 identity: Option<&str>,
30 to_npubs: &[String],
31 message: Option<&str>,
32 passphrase: Option<&str>,
33 verbosity: u8,
34) -> AmResult<SentMessage> {
35 let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
36 let config = load_config()?;
37
38 if config.relays.is_empty() {
39 return Err(AmError::Network(
40 "no relays configured; run `am relay add <url>`".into(),
41 ));
42 }
43
44 if to_npubs.is_empty() {
45 return Err(AmError::Args(
46 "at least one --to recipient is required".into(),
47 ));
48 }
49
50 let content = match message {
51 Some(m) => m.to_string(),
52 None => {
53 let mut buf = String::new();
54 io::stdin().read_to_string(&mut buf)?;
55 buf.trim_end().to_string()
56 }
57 };
58
59 if content.is_empty() {
60 return Err(AmError::Args("message is empty".into()));
61 }
62
63 let mut recipients = Vec::new();
64 for npub in to_npubs {
65 let pk = PublicKey::from_bech32(npub).map_err(|e| AmError::Crypto(e.to_string()))?;
66 recipients.push((npub.clone(), pk));
67 }
68
69 let client = client::connect(keys, &config.relays).await?;
70
71 let signer = client
73 .signer()
74 .await
75 .map_err(|e| AmError::Crypto(e.to_string()))?;
76
77 let our_pk = signer
78 .get_public_key()
79 .await
80 .map_err(|e| AmError::Crypto(e.to_string()))?;
81
82 let mut wrapped_events = Vec::new();
83 for (npub, pk) in &recipients {
84 let extra_tags: Vec<Tag> = if recipients.len() > 1 {
93 let mut tags: Vec<Tag> = recipients
94 .iter()
95 .filter(|(_, other_pk)| other_pk != pk)
96 .map(|(_, other_pk)| Tag::public_key(*other_pk))
97 .collect();
98 tags.push(Tag::public_key(our_pk));
99 tags
100 } else {
101 vec![]
102 };
103 let rumor = EventBuilder::private_msg_rumor(*pk, &content)
104 .tags(extra_tags)
105 .allow_self_tagging()
106 .build(our_pk);
107 let event = EventBuilder::gift_wrap(&signer, pk, rumor, [])
108 .await
109 .map_err(|e| AmError::Crypto(e.to_string()))?;
110 wrapped_events.push((npub.clone(), event));
111 }
112
113 let mut handles = Vec::new();
115 for (npub, event) in wrapped_events {
116 let client = client.clone();
117 let relays = config.relays.clone();
118 handles.push(tokio::spawn(async move {
119 let (relay_results, succeeded) =
120 client::send_with_retry(&client, &event, &relays, 3, verbosity).await;
121 (npub, relay_results, !succeeded.is_empty())
122 }));
123 }
124
125 let join_results = futures::future::join_all(handles).await;
126
127 let mut sent = Vec::new();
128 let mut failed = Vec::new();
129 let mut all_relay_results = Vec::new();
130
131 for result in join_results {
132 match result {
133 Ok((npub, relay_results, any_success)) => {
134 if any_success {
135 sent.push(npub);
136 } else {
137 failed.push(npub);
138 }
139 for rr in relay_results {
141 if let Some(existing) = all_relay_results
142 .iter_mut()
143 .find(|r: &&mut RelayResult| r.relay == rr.relay)
144 {
145 if matches!(rr.status, client::RelayStatus::Failed) {
147 existing.status = client::RelayStatus::Failed;
148 existing.error = rr.error;
149 }
150 } else {
151 all_relay_results.push(rr);
152 }
153 }
154 }
155 Err(e) => {
156 tracing::warn!("send task panicked: {e}");
157 }
158 }
159 }
160
161 client.disconnect().await;
162
163 if sent.is_empty() {
164 return Err(AmError::Network("failed to send to all recipients".into()));
165 }
166
167 Ok(SentMessage {
168 to: sent,
169 content,
170 failed,
171 relays: all_relay_results,
172 })
173}
174
175pub async fn listen(
176 identity: Option<&str>,
177 since: Option<u64>,
178 limit: Option<usize>,
179 once: bool,
180 timeout_secs: u64,
181 passphrase: Option<&str>,
182 _verbosity: u8,
183) -> AmResult<Vec<ReceivedMessage>> {
184 let keys = load_keys(identity.unwrap_or("default"), passphrase)?;
185 let config = load_config()?;
186
187 if config.relays.is_empty() {
188 return Err(AmError::Network(
189 "no relays configured; run `am relay add <url>`".into(),
190 ));
191 }
192
193 let client = client::connect(keys, &config.relays).await?;
194
195 let filter = {
196 let our_pk = client
197 .public_key()
198 .await
199 .map_err(|e| AmError::Crypto(e.to_string()))?;
200 let mut f = Filter::new().kind(Kind::GiftWrap).pubkey(our_pk);
201 if let Some(ts) = since {
202 f = f.since(Timestamp::from(ts));
203 }
204 if let Some(l) = limit {
205 f = f.limit(l);
206 }
207 f
208 };
209
210 if once {
211 let events = client
212 .fetch_events(filter, std::time::Duration::from_secs(timeout_secs))
213 .await
214 .map_err(|e| AmError::Network(e.to_string()))?;
215
216 let mut messages = Vec::new();
217 for event in events.into_iter() {
218 if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
219 messages.push(ReceivedMessage {
220 from: unwrapped.sender.to_bech32().unwrap_or_default(),
221 content: unwrapped.rumor.content.clone(),
222 timestamp: unwrapped.rumor.created_at.as_secs(),
223 });
224 }
225 }
226 messages.sort_by_key(|m| m.timestamp);
227 client.disconnect().await;
228 Ok(messages)
229 } else {
230 client
231 .subscribe(filter, None)
232 .await
233 .map_err(|e| AmError::Network(e.to_string()))?;
234
235 client
236 .handle_notifications(|notification| async {
237 if let RelayPoolNotification::Event { event, .. } = notification {
238 if event.kind == Kind::GiftWrap {
239 if let Ok(unwrapped) = client.unwrap_gift_wrap(&event).await {
240 let msg = ReceivedMessage {
241 from: unwrapped.sender.to_bech32().unwrap_or_default(),
242 content: unwrapped.rumor.content.clone(),
243 timestamp: unwrapped.rumor.created_at.as_secs(),
244 };
245 if let Ok(json) = serde_json::to_string(&msg) {
246 println!("{json}");
247 }
248 }
249 }
250 }
251 Ok(false)
252 })
253 .await
254 .map_err(|e| AmError::Network(e.to_string()))?;
255
256 Ok(vec![])
257 }
258}