1use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13use anyhow::{Result, bail};
14use serde_json::json;
15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
16use tokio::net::{TcpListener, TcpStream};
17use tokio::sync::{Mutex, Semaphore, broadcast};
18use tracing::{error, info, warn};
19
20use crate::actions::Action;
21use crate::events::RawEvent;
22use crate::transaction::{TransactionManager, TransactionResult};
23
24const MIN_PORT: u16 = 45670;
25const MAX_PORT: u16 = 65535;
26
27pub fn calculate_port() -> u16 {
29 let uid = unsafe { libc::getuid() };
30 let port = MIN_PORT as u32 + uid.saturating_sub(501);
31 port.min(MAX_PORT as u32) as u16
32}
33
34struct Client {
36 id: String,
37 writer: tokio::io::WriteHalf<TcpStream>,
38 process: Option<String>,
39}
40
41struct ReadContext {
43 client: Arc<Mutex<Client>>,
44 clients: Arc<Mutex<HashMap<String, Arc<Mutex<Client>>>>>,
45 event_tx: broadcast::Sender<RawEvent>,
46 txn_mgr: Arc<TransactionManager>,
47 messages_ready: Arc<AtomicBool>,
48 facetime_ready: Arc<AtomicBool>,
49 findmy_ready: Arc<AtomicBool>,
50}
51
52pub struct PrivateApiService {
54 port: u16,
55 transaction_manager: Arc<TransactionManager>,
56 write_lock: Arc<Semaphore>,
57 clients: Arc<Mutex<HashMap<String, Arc<Mutex<Client>>>>>,
58 event_tx: broadcast::Sender<RawEvent>,
59 shutdown_tx: Option<broadcast::Sender<()>>,
60 messages_ready: Arc<AtomicBool>,
63 facetime_ready: Arc<AtomicBool>,
64 findmy_ready: Arc<AtomicBool>,
65}
66
67impl PrivateApiService {
68 pub fn new() -> Self {
69 let (event_tx, _) = broadcast::channel(256);
70 Self {
71 port: calculate_port(),
72 transaction_manager: Arc::new(TransactionManager::new()),
73 write_lock: Arc::new(Semaphore::new(1)),
74 clients: Arc::new(Mutex::new(HashMap::new())),
75 event_tx,
76 shutdown_tx: None,
77 messages_ready: Arc::new(AtomicBool::new(false)),
78 facetime_ready: Arc::new(AtomicBool::new(false)),
79 findmy_ready: Arc::new(AtomicBool::new(false)),
80 }
81 }
82
83 pub fn subscribe_events(&self) -> broadcast::Receiver<RawEvent> {
85 self.event_tx.subscribe()
86 }
87
88 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
90 &self.transaction_manager
91 }
92
93 pub async fn is_connected(&self) -> bool {
95 let clients = self.clients.lock().await;
96 !clients.is_empty()
97 }
98
99 pub fn is_messages_ready(&self) -> bool {
101 self.messages_ready.load(Ordering::Acquire)
102 }
103
104 pub fn is_facetime_ready(&self) -> bool {
106 self.facetime_ready.load(Ordering::Acquire)
107 }
108
109 pub fn is_findmy_ready(&self) -> bool {
111 self.findmy_ready.load(Ordering::Acquire)
112 }
113
114 pub fn clear_findmy_ready(&self) {
116 self.findmy_ready.store(false, Ordering::Release);
117 }
118
119 pub async fn start(&mut self) -> Result<tokio::task::JoinHandle<()>> {
121 let addr = format!("127.0.0.1:{}", self.port);
122 let listener = TcpListener::bind(&addr).await?;
123 info!("Private API TCP server listening on {addr}");
124
125 let (shutdown_tx, _) = broadcast::channel(1);
126 self.shutdown_tx = Some(shutdown_tx.clone());
127
128 let clients = self.clients.clone();
129 let event_tx = self.event_tx.clone();
130 let txn_mgr = self.transaction_manager.clone();
131 let messages_ready = self.messages_ready.clone();
132 let facetime_ready = self.facetime_ready.clone();
133 let findmy_ready = self.findmy_ready.clone();
134
135 let handle = tokio::spawn(async move {
136 let mut shutdown_rx = shutdown_tx.subscribe();
137
138 loop {
139 tokio::select! {
140 result = listener.accept() => {
141 match result {
142 Ok((stream, addr)) => {
143 let client_id = uuid::Uuid::new_v4().to_string();
144 info!("Private API client connected: {addr} (id: {client_id})");
145
146 let (reader, writer) = tokio::io::split(stream);
147 let client = Arc::new(Mutex::new(Client {
148 id: client_id.clone(),
149 writer,
150 process: None,
151 }));
152
153 {
154 let mut clients = clients.lock().await;
155 clients.insert(client_id.clone(), client.clone());
156 }
157
158 let ctx = ReadContext {
160 client: client.clone(),
161 clients: clients.clone(),
162 event_tx: event_tx.clone(),
163 txn_mgr: txn_mgr.clone(),
164 messages_ready: messages_ready.clone(),
165 facetime_ready: facetime_ready.clone(),
166 findmy_ready: findmy_ready.clone(),
167 };
168 let client_id_clone = client_id.clone();
169
170 tokio::spawn(async move {
171 Self::handle_client_reads(
172 reader,
173 &client_id_clone,
174 ctx,
175 )
176 .await;
177 });
178 }
179 Err(e) => {
180 error!("Failed to accept TCP connection: {e}");
181 }
182 }
183 }
184 _ = shutdown_rx.recv() => {
185 info!("Private API TCP server shutting down");
186 break;
187 }
188 }
189 }
190 });
191
192 Ok(handle)
193 }
194
195 pub fn stop(&self) {
197 if let Some(ref tx) = self.shutdown_tx {
198 let _ = tx.send(());
199 }
200 }
201
202 pub async fn send_action(&self, action: Action) -> Result<Option<TransactionResult>> {
204 let permit = self.write_lock.clone().acquire_owned().await?;
206
207 let has_transaction = action.transaction_type.is_some();
208 let (transaction_id, rx) = if let Some(txn_type) = action.transaction_type {
209 let (id, rx) = self.transaction_manager.create(txn_type).await;
210 (Some(id), Some(rx))
211 } else {
212 (None, None)
213 };
214
215 let mut msg = json!({
217 "action": action.name,
218 "data": action.data,
219 });
220 if let Some(ref id) = transaction_id {
221 msg["transactionId"] = json!(id);
222 }
223
224 let wire = format!("{}\n", serde_json::to_string(&msg)?);
225
226 let clients = self.clients.lock().await;
228 let mut write_success = false;
229
230 for (_, client) in clients.iter() {
231 let mut client = client.lock().await;
232 match client.writer.write_all(wire.as_bytes()).await {
233 Ok(_) => {
234 write_success = true;
235 }
236 Err(e) => {
237 warn!("Failed to write to client {}: {e}", client.id);
238 }
239 }
240 }
241 drop(clients);
242
243 if !write_success && has_transaction {
244 if let Some(ref id) = transaction_id {
246 self.transaction_manager
247 .reject(id, "No connected clients")
248 .await;
249 }
250 }
251
252 tokio::spawn(async move {
254 tokio::time::sleep(Duration::from_millis(200)).await;
255 drop(permit);
256 });
257
258 if let Some(rx) = rx {
260 match rx.await {
261 Ok(Ok(result)) => Ok(Some(result)),
262 Ok(Err(e)) => bail!("Transaction error: {e}"),
263 Err(_) => bail!("Transaction channel closed"),
264 }
265 } else {
266 Ok(None)
267 }
268 }
269
270 async fn handle_client_reads(
272 reader: tokio::io::ReadHalf<TcpStream>,
273 client_id: &str,
274 ctx: ReadContext,
275 ) {
276 let mut buf_reader = BufReader::new(reader);
277 let mut line_buf = String::new();
278
279 loop {
280 line_buf.clear();
281 match buf_reader.read_line(&mut line_buf).await {
282 Ok(0) => {
283 info!("Private API client disconnected: {client_id}");
285 {
286 let c = ctx.client.lock().await;
287 if let Some(ref process) = c.process {
288 Self::set_process_ready(
289 process,
290 &ctx.messages_ready,
291 &ctx.facetime_ready,
292 &ctx.findmy_ready,
293 false,
294 );
295 }
296 }
297 let mut clients = ctx.clients.lock().await;
298 clients.remove(client_id);
299 break;
300 }
301 Ok(_) => {
302 let raw = line_buf.trim().to_string();
304 if raw.is_empty() {
305 continue;
306 }
307
308 let events: HashSet<&str> = raw.split('\n').collect();
310 for event_str in events {
311 let event_str = event_str.trim();
312 if event_str.is_empty() {
313 continue;
314 }
315
316 match serde_json::from_str::<RawEvent>(event_str) {
317 Ok(event) => {
318 if event.is_transaction_response() {
320 let txn_id = event.transaction_id.as_deref().unwrap();
321 if let Some(ref error) = event.error
322 && !error.is_empty()
323 {
324 ctx.txn_mgr.reject(txn_id, error).await;
325 continue;
326 }
327 let identifier = event.identifier.as_deref().unwrap_or("");
328 ctx.txn_mgr
329 .resolve(txn_id, identifier, event.extract_data())
330 .await;
331 continue;
332 }
333
334 if event.event.as_deref() == Some("ping")
336 && let Some(ref process) = event.process
337 {
338 let mut c = ctx.client.lock().await;
339 c.process = Some(process.clone());
340 info!(
341 "Private API client registered: {} (process: {process})",
342 client_id
343 );
344 }
345
346 if event.event.as_deref() == Some("ready")
348 && let Some(ref process) = event.process
349 {
350 Self::set_process_ready(
351 process,
352 &ctx.messages_ready,
353 &ctx.facetime_ready,
354 &ctx.findmy_ready,
355 true,
356 );
357 info!("Private API ready: {process}");
358 }
359
360 let _ = ctx.event_tx.send(event);
362 }
363 Err(e) => {
364 warn!("Failed to parse Private API event: {e} (data: {event_str})");
365 }
366 }
367 }
368 }
369 Err(e) => {
370 error!("Error reading from Private API client {client_id}: {e}");
371 {
372 let c = ctx.client.lock().await;
373 if let Some(ref process) = c.process {
374 Self::set_process_ready(
375 process,
376 &ctx.messages_ready,
377 &ctx.facetime_ready,
378 &ctx.findmy_ready,
379 false,
380 );
381 }
382 }
383 let mut clients = ctx.clients.lock().await;
384 clients.remove(client_id);
385 break;
386 }
387 }
388 }
389 }
390
391 fn set_process_ready(
393 process: &str,
394 messages_ready: &AtomicBool,
395 facetime_ready: &AtomicBool,
396 findmy_ready: &AtomicBool,
397 ready: bool,
398 ) {
399 match process {
400 "com.apple.MobileSMS" | "com.apple.Messages" => {
401 messages_ready.store(ready, Ordering::Release);
402 }
403 "com.apple.FaceTime" | "com.apple.TelephonyUtilities" => {
404 facetime_ready.store(ready, Ordering::Release);
405 }
406 "com.apple.findmy" => {
407 findmy_ready.store(ready, Ordering::Release);
408 }
409 _ => {}
410 }
411 }
412}
413
414impl Default for PrivateApiService {
415 fn default() -> Self {
416 Self::new()
417 }
418}
419
420mod libc {
422 unsafe extern "C" {
423 pub fn getuid() -> u32;
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn port_calculation_first_user() {
433 assert_eq!(
435 MIN_PORT as u32 + 501u32.saturating_sub(501),
436 MIN_PORT as u32
437 );
438 }
439
440 #[test]
441 fn port_clamped_to_max() {
442 let uid = 100000u32;
443 let port = MIN_PORT as u32 + uid.saturating_sub(501);
444 let clamped = port.min(MAX_PORT as u32) as u16;
445 assert_eq!(clamped, MAX_PORT);
446 }
447
448 #[tokio::test]
449 async fn service_starts_and_stops() {
450 let mut service = PrivateApiService::new();
451 service.port = 0; assert!(!service.is_connected().await);
458 assert!(!service.is_messages_ready());
459 assert!(!service.is_facetime_ready());
460 assert_eq!(service.transaction_manager().pending_count().await, 0);
461 }
462}