mostro_client/parser/
dms.rs

1use std::collections::HashSet;
2
3use anyhow::Result;
4use base64::engine::general_purpose;
5use base64::Engine;
6use chrono::DateTime;
7use mostro_core::prelude::*;
8use nip44::v2::{decrypt_to_bytes, ConversationKey};
9use nostr_sdk::prelude::*;
10
11use crate::db::{Order, User};
12use sqlx::SqlitePool;
13
14pub async fn parse_dm_events(events: Events, pubkey: &Keys) -> Vec<(Message, u64, PublicKey)> {
15    let mut id_set = HashSet::<EventId>::new();
16    let mut direct_messages: Vec<(Message, u64, PublicKey)> = Vec::new();
17
18    for dm in events.iter() {
19        // Skip if already processed
20        if !id_set.insert(dm.id) {
21            continue;
22        }
23
24        let (created_at, message) = match dm.kind {
25            nostr_sdk::Kind::GiftWrap => {
26                let unwrapped_gift = match nip59::extract_rumor(pubkey, dm).await {
27                    Ok(u) => u,
28                    Err(_) => {
29                        println!("Error unwrapping gift");
30                        continue;
31                    }
32                };
33                let (message, _): (Message, Option<String>) =
34                    match serde_json::from_str(&unwrapped_gift.rumor.content) {
35                        Ok(msg) => msg,
36                        Err(_) => {
37                            println!("Error parsing gift wrap content");
38                            continue;
39                        }
40                    };
41                (unwrapped_gift.rumor.created_at, message)
42            }
43            nostr_sdk::Kind::PrivateDirectMessage => {
44                let ck = if let Ok(ck) = ConversationKey::derive(pubkey.secret_key(), &dm.pubkey) {
45                    ck
46                } else {
47                    continue;
48                };
49                let b64decoded_content =
50                    match general_purpose::STANDARD.decode(dm.content.as_bytes()) {
51                        Ok(b64decoded_content) => b64decoded_content,
52                        Err(_) => {
53                            continue;
54                        }
55                    };
56                let unencrypted_content = match decrypt_to_bytes(&ck, &b64decoded_content) {
57                    Ok(bytes) => bytes,
58                    Err(_) => {
59                        continue;
60                    }
61                };
62                let message_str = match String::from_utf8(unencrypted_content) {
63                    Ok(s) => s,
64                    Err(_) => {
65                        continue;
66                    }
67                };
68                let message = match Message::from_json(&message_str) {
69                    Ok(m) => m,
70                    Err(_) => {
71                        continue;
72                    }
73                };
74                (dm.created_at, message)
75            }
76            _ => continue,
77        };
78
79        let since_time = match chrono::Utc::now().checked_sub_signed(chrono::Duration::minutes(30))
80        {
81            Some(dt) => dt.timestamp() as u64,
82            None => {
83                println!("Error: Unable to calculate time 30 minutes ago");
84                continue;
85            }
86        };
87        if created_at.as_u64() < since_time {
88            continue;
89        }
90        direct_messages.push((message, created_at.as_u64(), dm.pubkey));
91    }
92    direct_messages.sort_by(|a, b| a.1.cmp(&b.1));
93    direct_messages
94}
95
96pub async fn print_direct_messages(dm: &[(Message, u64)], pool: &SqlitePool) -> Result<()> {
97    if dm.is_empty() {
98        println!();
99        println!("No new messages");
100        println!();
101    } else {
102        for m in dm.iter() {
103            let message = m.0.get_inner_message_kind();
104            let date = match DateTime::from_timestamp(m.1 as i64, 0) {
105                Some(dt) => dt,
106                None => {
107                    println!("Error: Invalid timestamp {}", m.1);
108                    continue;
109                }
110            };
111            if let Some(order_id) = message.id {
112                println!(
113                    "Mostro sent you this message for order id: {} at {}",
114                    order_id, date
115                );
116            }
117            if let Some(payload) = &message.payload {
118                match payload {
119                    Payload::PaymentRequest(_, inv, _) => {
120                        println!();
121                        println!("Pay this invoice to continue --> {}", inv);
122                        println!();
123                    }
124                    Payload::TextMessage(text) => {
125                        println!();
126                        println!("{text}");
127                        println!();
128                    }
129                    Payload::Dispute(id, info) => {
130                        println!("Action: {}", message.action);
131                        println!("Dispute id: {}", id);
132                        if let Some(info) = info {
133                            println!();
134                            println!("Dispute info: {:#?}", info);
135                            println!();
136                        }
137                    }
138                    Payload::CantDo(Some(cant_do_reason)) => {
139                        println!();
140                        println!("Error: {:?}", cant_do_reason);
141                        println!();
142                    }
143                    Payload::Order(new_order) if message.action == Action::NewOrder => {
144                        if let Some(order_id) = new_order.id {
145                            let db_order = Order::get_by_id(pool, &order_id.to_string()).await;
146                            if db_order.is_err() {
147                                if let Some(trade_index) = message.trade_index {
148                                    let trade_keys =
149                                        User::get_trade_keys(pool, trade_index).await?;
150                                    let _ = Order::new(pool, new_order.clone(), &trade_keys, None)
151                                        .await
152                                        .map_err(|e| {
153                                            anyhow::anyhow!("Failed to create DB order: {:?}", e)
154                                        })?;
155                                } else {
156                                    println!("Warning: No trade_index found for new order");
157                                }
158                            }
159                        }
160                        println!();
161                        println!("Order: {:#?}", new_order);
162                        println!();
163                    }
164                    _ => {
165                        println!();
166                        println!("Action: {}", message.action);
167                        println!("Payload: {:#?}", message.payload);
168                        println!();
169                    }
170                }
171            } else {
172                println!();
173                println!("Action: {}", message.action);
174                println!("Payload: {:#?}", message.payload);
175                println!();
176            }
177        }
178    }
179    Ok(())
180}
181
182#[cfg(test)]
183mod tests {}