imessage_private_api/
transaction.rs1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use serde_json::Value;
11use tokio::sync::{Mutex, oneshot};
12use tracing::warn;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum TransactionType {
17 Chat,
18 Message,
19 Attachment,
20 Handle,
21 FindMy,
22 Other,
23}
24
25#[derive(Debug, Clone)]
27pub struct TransactionResult {
28 pub transaction_type: TransactionType,
29 pub identifier: String,
30 pub data: Option<Value>,
31}
32
33struct PendingTransaction {
35 transaction_type: TransactionType,
36 sender: oneshot::Sender<Result<TransactionResult, String>>,
37}
38
39pub struct TransactionManager {
41 pending: Arc<Mutex<HashMap<String, PendingTransaction>>>,
42}
43
44impl TransactionManager {
45 pub fn new() -> Self {
46 Self {
47 pending: Arc::new(Mutex::new(HashMap::new())),
48 }
49 }
50
51 pub async fn create(
55 &self,
56 transaction_type: TransactionType,
57 ) -> (String, oneshot::Receiver<Result<TransactionResult, String>>) {
58 let id = uuid::Uuid::new_v4().to_string();
59 let (tx, rx) = oneshot::channel();
60
61 {
62 let mut pending = self.pending.lock().await;
63 pending.insert(
64 id.clone(),
65 PendingTransaction {
66 transaction_type,
67 sender: tx,
68 },
69 );
70 }
71
72 let pending_clone = self.pending.clone();
74 let id_clone = id.clone();
75 tokio::spawn(async move {
76 tokio::time::sleep(Duration::from_secs(120)).await;
77 let mut pending = pending_clone.lock().await;
78 if let Some(txn) = pending.remove(&id_clone) {
79 let _ = txn.sender.send(Err("Transaction timeout".to_string()));
80 warn!("Transaction {id_clone} timed out after 120s");
81 }
82 });
83
84 (id, rx)
85 }
86
87 pub async fn resolve(&self, transaction_id: &str, identifier: &str, data: Option<Value>) {
89 let mut pending = self.pending.lock().await;
90 if let Some(txn) = pending.remove(transaction_id) {
91 let _ = txn.sender.send(Ok(TransactionResult {
92 transaction_type: txn.transaction_type,
93 identifier: identifier.to_string(),
94 data,
95 }));
96 }
97 }
98
99 pub async fn reject(&self, transaction_id: &str, error: &str) {
101 let mut pending = self.pending.lock().await;
102 if let Some(txn) = pending.remove(transaction_id) {
103 let _ = txn.sender.send(Err(error.to_string()));
104 }
105 }
106
107 pub async fn is_pending(&self, transaction_id: &str) -> bool {
109 let pending = self.pending.lock().await;
110 pending.contains_key(transaction_id)
111 }
112
113 pub async fn pending_count(&self) -> usize {
115 let pending = self.pending.lock().await;
116 pending.len()
117 }
118}
119
120impl Default for TransactionManager {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[tokio::test]
131 async fn create_and_resolve() {
132 let mgr = TransactionManager::new();
133 let (id, rx) = mgr.create(TransactionType::Message).await;
134
135 assert!(mgr.is_pending(&id).await);
136
137 mgr.resolve(
138 &id,
139 "msg-guid-123",
140 Some(serde_json::json!({"status": "ok"})),
141 )
142 .await;
143
144 let result = rx.await.unwrap().unwrap();
145 assert_eq!(result.identifier, "msg-guid-123");
146 assert_eq!(result.transaction_type, TransactionType::Message);
147 assert!(result.data.is_some());
148
149 assert!(!mgr.is_pending(&id).await);
150 }
151
152 #[tokio::test]
153 async fn create_and_reject() {
154 let mgr = TransactionManager::new();
155 let (id, rx) = mgr.create(TransactionType::Chat).await;
156
157 mgr.reject(&id, "something went wrong").await;
158
159 let result = rx.await.unwrap();
160 assert!(result.is_err());
161 assert_eq!(result.unwrap_err(), "something went wrong");
162 }
163
164 #[tokio::test]
165 async fn pending_count() {
166 let mgr = TransactionManager::new();
167 assert_eq!(mgr.pending_count().await, 0);
168
169 let (id1, _rx1) = mgr.create(TransactionType::Message).await;
170 let (_id2, _rx2) = mgr.create(TransactionType::Chat).await;
171 assert_eq!(mgr.pending_count().await, 2);
172
173 mgr.resolve(&id1, "x", None).await;
174 assert_eq!(mgr.pending_count().await, 1);
175 }
176}