Skip to main content

imessage_private_api/
service.rs

1/// Private API TCP service.
2///
3/// Binds a TCP server on localhost, accepts connections from the helper dylib,
4/// sends actions as newline-delimited JSON, and processes incoming events/responses.
5///
6/// Port: 45670 + (uid - 501), clamped to [45670, 65535].
7/// Write lock: Semaphore(1), released 200ms after each write.
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13use anyhow::{Result, bail};
14use serde_json::json;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::net::{TcpListener, TcpStream};
17use tokio::sync::{Mutex, Semaphore, broadcast};
18use tracing::{error, info, warn};
19
20use crate::actions::Action;
21use crate::events::RawEvent;
22use crate::transaction::{TransactionManager, TransactionResult};
23
24const MIN_PORT: u16 = 45670;
25const MAX_PORT: u16 = 65535;
26
27/// Calculate the Private API port for the current user.
28pub fn calculate_port() -> u16 {
29    let uid = unsafe { libc::getuid() };
30    let port = MIN_PORT as u32 + uid.saturating_sub(501);
31    port.min(MAX_PORT as u32) as u16
32}
33
34/// A connected client socket.
35struct Client {
36    id: String,
37    writer: tokio::io::WriteHalf<TcpStream>,
38    process: Option<String>,
39}
40
41/// Bundled context for the client reader task (avoids too many function arguments).
42struct ReadContext {
43    client: Arc<Mutex<Client>>,
44    clients: Arc<Mutex<HashMap<String, Arc<Mutex<Client>>>>>,
45    event_tx: broadcast::Sender<RawEvent>,
46    txn_mgr: Arc<TransactionManager>,
47    messages_ready: Arc<AtomicBool>,
48    facetime_ready: Arc<AtomicBool>,
49    findmy_ready: Arc<AtomicBool>,
50}
51
52/// The Private API service manages the TCP server and connected dylib clients.
53pub struct PrivateApiService {
54    port: u16,
55    transaction_manager: Arc<TransactionManager>,
56    write_lock: Arc<Semaphore>,
57    clients: Arc<Mutex<HashMap<String, Arc<Mutex<Client>>>>>,
58    event_tx: broadcast::Sender<RawEvent>,
59    shutdown_tx: Option<broadcast::Sender<()>>,
60    /// Per-process readiness: set when the dylib sends a "ready" event after
61    /// eagerly initializing IMCore singletons. Reset on client disconnect.
62    messages_ready: Arc<AtomicBool>,
63    facetime_ready: Arc<AtomicBool>,
64    findmy_ready: Arc<AtomicBool>,
65}
66
67impl PrivateApiService {
68    pub fn new() -> Self {
69        let (event_tx, _) = broadcast::channel(256);
70        Self {
71            port: calculate_port(),
72            transaction_manager: Arc::new(TransactionManager::new()),
73            write_lock: Arc::new(Semaphore::new(1)),
74            clients: Arc::new(Mutex::new(HashMap::new())),
75            event_tx,
76            shutdown_tx: None,
77            messages_ready: Arc::new(AtomicBool::new(false)),
78            facetime_ready: Arc::new(AtomicBool::new(false)),
79            findmy_ready: Arc::new(AtomicBool::new(false)),
80        }
81    }
82
83    /// Subscribe to incoming events from the dylib.
84    pub fn subscribe_events(&self) -> broadcast::Receiver<RawEvent> {
85        self.event_tx.subscribe()
86    }
87
88    /// Get a reference to the transaction manager.
89    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
90        &self.transaction_manager
91    }
92
93    /// Check if any dylib clients are connected.
94    pub async fn is_connected(&self) -> bool {
95        let clients = self.clients.lock().await;
96        !clients.is_empty()
97    }
98
99    /// Check if the Messages.app dylib has finished IMCore initialization.
100    pub fn is_messages_ready(&self) -> bool {
101        self.messages_ready.load(Ordering::Acquire)
102    }
103
104    /// Check if the FaceTime.app dylib is ready.
105    pub fn is_facetime_ready(&self) -> bool {
106        self.facetime_ready.load(Ordering::Acquire)
107    }
108
109    /// Check if the FindMy.app dylib is ready.
110    pub fn is_findmy_ready(&self) -> bool {
111        self.findmy_ready.load(Ordering::Acquire)
112    }
113
114    /// Clear the FindMy readiness flag (used before restarting FindMy.app).
115    pub fn clear_findmy_ready(&self) {
116        self.findmy_ready.store(false, Ordering::Release);
117    }
118
119    /// Start the TCP server. Returns a handle to the server task.
120    pub async fn start(&mut self) -> Result<tokio::task::JoinHandle<()>> {
121        let addr = format!("127.0.0.1:{}", self.port);
122        let listener = TcpListener::bind(&addr).await?;
123        info!("Private API TCP server listening on {addr}");
124
125        let (shutdown_tx, _) = broadcast::channel(1);
126        self.shutdown_tx = Some(shutdown_tx.clone());
127
128        let clients = self.clients.clone();
129        let event_tx = self.event_tx.clone();
130        let txn_mgr = self.transaction_manager.clone();
131        let messages_ready = self.messages_ready.clone();
132        let facetime_ready = self.facetime_ready.clone();
133        let findmy_ready = self.findmy_ready.clone();
134
135        let handle = tokio::spawn(async move {
136            let mut shutdown_rx = shutdown_tx.subscribe();
137
138            loop {
139                tokio::select! {
140                    result = listener.accept() => {
141                        match result {
142                            Ok((stream, addr)) => {
143                                let client_id = uuid::Uuid::new_v4().to_string();
144                                info!("Private API client connected: {addr} (id: {client_id})");
145
146                                let (reader, writer) = tokio::io::split(stream);
147                                let client = Arc::new(Mutex::new(Client {
148                                    id: client_id.clone(),
149                                    writer,
150                                    process: None,
151                                }));
152
153                                {
154                                    let mut clients = clients.lock().await;
155                                    clients.insert(client_id.clone(), client.clone());
156                                }
157
158                                // Spawn a reader task for this client
159                                let ctx = ReadContext {
160                                    client: client.clone(),
161                                    clients: clients.clone(),
162                                    event_tx: event_tx.clone(),
163                                    txn_mgr: txn_mgr.clone(),
164                                    messages_ready: messages_ready.clone(),
165                                    facetime_ready: facetime_ready.clone(),
166                                    findmy_ready: findmy_ready.clone(),
167                                };
168                                let client_id_clone = client_id.clone();
169
170                                tokio::spawn(async move {
171                                    Self::handle_client_reads(
172                                        reader,
173                                        &client_id_clone,
174                                        ctx,
175                                    )
176                                    .await;
177                                });
178                            }
179                            Err(e) => {
180                                error!("Failed to accept TCP connection: {e}");
181                            }
182                        }
183                    }
184                    _ = shutdown_rx.recv() => {
185                        info!("Private API TCP server shutting down");
186                        break;
187                    }
188                }
189            }
190        });
191
192        Ok(handle)
193    }
194
195    /// Stop the TCP server.
196    pub fn stop(&self) {
197        if let Some(ref tx) = self.shutdown_tx {
198            let _ = tx.send(());
199        }
200    }
201
202    /// Send an action to all connected clients, optionally awaiting a transaction response.
203    pub async fn send_action(&self, action: Action) -> Result<Option<TransactionResult>> {
204        // Acquire write lock
205        let permit = self.write_lock.clone().acquire_owned().await?;
206
207        let has_transaction = action.transaction_type.is_some();
208        let (transaction_id, rx) = if let Some(txn_type) = action.transaction_type {
209            let (id, rx) = self.transaction_manager.create(txn_type).await;
210            (Some(id), Some(rx))
211        } else {
212            (None, None)
213        };
214
215        // Build the wire message
216        let mut msg = json!({
217            "action": action.name,
218            "data": action.data,
219        });
220        if let Some(ref id) = transaction_id {
221            msg["transactionId"] = json!(id);
222        }
223
224        let wire = format!("{}\n", serde_json::to_string(&msg)?);
225
226        // Write to all connected clients
227        let clients = self.clients.lock().await;
228        let mut write_success = false;
229
230        for (_, client) in clients.iter() {
231            let mut client = client.lock().await;
232            match client.writer.write_all(wire.as_bytes()).await {
233                Ok(_) => {
234                    write_success = true;
235                }
236                Err(e) => {
237                    warn!("Failed to write to client {}: {e}", client.id);
238                }
239            }
240        }
241        drop(clients);
242
243        if !write_success && has_transaction {
244            // Clean up the transaction
245            if let Some(ref id) = transaction_id {
246                self.transaction_manager
247                    .reject(id, "No connected clients")
248                    .await;
249            }
250        }
251
252        // Release write lock after 200ms delay
253        tokio::spawn(async move {
254            tokio::time::sleep(Duration::from_millis(200)).await;
255            drop(permit);
256        });
257
258        // Await transaction response if applicable
259        if let Some(rx) = rx {
260            match rx.await {
261                Ok(Ok(result)) => Ok(Some(result)),
262                Ok(Err(e)) => bail!("Transaction error: {e}"),
263                Err(_) => bail!("Transaction channel closed"),
264            }
265        } else {
266            Ok(None)
267        }
268    }
269
270    /// Handle reading from a single client connection.
271    async fn handle_client_reads(
272        reader: tokio::io::ReadHalf<TcpStream>,
273        client_id: &str,
274        ctx: ReadContext,
275    ) {
276        let mut buf_reader = BufReader::new(reader);
277        let mut line_buf = String::new();
278
279        loop {
280            line_buf.clear();
281            match buf_reader.read_line(&mut line_buf).await {
282                Ok(0) => {
283                    // Connection closed — reset readiness for this process
284                    info!("Private API client disconnected: {client_id}");
285                    {
286                        let c = ctx.client.lock().await;
287                        if let Some(ref process) = c.process {
288                            Self::set_process_ready(
289                                process,
290                                &ctx.messages_ready,
291                                &ctx.facetime_ready,
292                                &ctx.findmy_ready,
293                                false,
294                            );
295                        }
296                    }
297                    let mut clients = ctx.clients.lock().await;
298                    clients.remove(client_id);
299                    break;
300                }
301                Ok(_) => {
302                    // Parse newline-delimited JSON events (may have multiple per read)
303                    let raw = line_buf.trim().to_string();
304                    if raw.is_empty() {
305                        continue;
306                    }
307
308                    // De-duplicate events in the same batch
309                    let events: HashSet<&str> = raw.split('\n').collect();
310                    for event_str in events {
311                        let event_str = event_str.trim();
312                        if event_str.is_empty() {
313                            continue;
314                        }
315
316                        match serde_json::from_str::<RawEvent>(event_str) {
317                            Ok(event) => {
318                                // Handle transaction responses
319                                if event.is_transaction_response() {
320                                    let txn_id = event.transaction_id.as_deref().unwrap();
321                                    if let Some(ref error) = event.error
322                                        && !error.is_empty()
323                                    {
324                                        ctx.txn_mgr.reject(txn_id, error).await;
325                                        continue;
326                                    }
327                                    let identifier = event.identifier.as_deref().unwrap_or("");
328                                    ctx.txn_mgr
329                                        .resolve(txn_id, identifier, event.extract_data())
330                                        .await;
331                                    continue;
332                                }
333
334                                // Handle ping events specially (register the process)
335                                if event.event.as_deref() == Some("ping")
336                                    && let Some(ref process) = event.process
337                                {
338                                    let mut c = ctx.client.lock().await;
339                                    c.process = Some(process.clone());
340                                    info!(
341                                        "Private API client registered: {} (process: {process})",
342                                        client_id
343                                    );
344                                }
345
346                                // Handle ready events (dylib has finished IMCore initialization)
347                                if event.event.as_deref() == Some("ready")
348                                    && let Some(ref process) = event.process
349                                {
350                                    Self::set_process_ready(
351                                        process,
352                                        &ctx.messages_ready,
353                                        &ctx.facetime_ready,
354                                        &ctx.findmy_ready,
355                                        true,
356                                    );
357                                    info!("Private API ready: {process}");
358                                }
359
360                                // Broadcast the event to subscribers
361                                let _ = ctx.event_tx.send(event);
362                            }
363                            Err(e) => {
364                                warn!("Failed to parse Private API event: {e} (data: {event_str})");
365                            }
366                        }
367                    }
368                }
369                Err(e) => {
370                    error!("Error reading from Private API client {client_id}: {e}");
371                    {
372                        let c = ctx.client.lock().await;
373                        if let Some(ref process) = c.process {
374                            Self::set_process_ready(
375                                process,
376                                &ctx.messages_ready,
377                                &ctx.facetime_ready,
378                                &ctx.findmy_ready,
379                                false,
380                            );
381                        }
382                    }
383                    let mut clients = ctx.clients.lock().await;
384                    clients.remove(client_id);
385                    break;
386                }
387            }
388        }
389    }
390
391    /// Set or clear the readiness flag for a process based on its bundle identifier.
392    fn set_process_ready(
393        process: &str,
394        messages_ready: &AtomicBool,
395        facetime_ready: &AtomicBool,
396        findmy_ready: &AtomicBool,
397        ready: bool,
398    ) {
399        match process {
400            "com.apple.MobileSMS" | "com.apple.Messages" => {
401                messages_ready.store(ready, Ordering::Release);
402            }
403            "com.apple.FaceTime" | "com.apple.TelephonyUtilities" => {
404                facetime_ready.store(ready, Ordering::Release);
405            }
406            "com.apple.findmy" => {
407                findmy_ready.store(ready, Ordering::Release);
408            }
409            _ => {}
410        }
411    }
412}
413
414impl Default for PrivateApiService {
415    fn default() -> Self {
416        Self::new()
417    }
418}
419
420// libc FFI for getuid
421mod libc {
422    unsafe extern "C" {
423        pub fn getuid() -> u32;
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn port_calculation_first_user() {
433        // uid 501 -> port 45670
434        assert_eq!(
435            MIN_PORT as u32 + 501u32.saturating_sub(501),
436            MIN_PORT as u32
437        );
438    }
439
440    #[test]
441    fn port_clamped_to_max() {
442        let uid = 100000u32;
443        let port = MIN_PORT as u32 + uid.saturating_sub(501);
444        let clamped = port.min(MAX_PORT as u32) as u16;
445        assert_eq!(clamped, MAX_PORT);
446    }
447
448    #[tokio::test]
449    async fn service_starts_and_stops() {
450        let mut service = PrivateApiService::new();
451        // Override port to avoid conflict
452        service.port = 0; // Let OS assign
453
454        // We can't easily test the full TCP flow without a mock client,
455        // but we can verify the service is constructable and the
456        // transaction manager works.
457        assert!(!service.is_connected().await);
458        assert!(!service.is_messages_ready());
459        assert!(!service.is_facetime_ready());
460        assert_eq!(service.transaction_manager().pending_count().await, 0);
461    }
462}