1use std::collections::VecDeque;
2
3use bytes::Bytes;
4use tokio::io::{AsyncRead, AsyncWrite};
5
6use super::tap::{TapClient, TapMessage};
7use crate::services::dtx::codec::DtxError;
8
9#[derive(Debug, Clone, PartialEq)]
10pub enum ActivityTraceValue {
11 Bytes(Bytes),
12 Null,
13 Struct(Vec<ActivityTraceValue>),
14}
15
16#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
17pub struct ActivityTraceEntry {
18 pub process: u32,
19 pub thread: u32,
20 pub subsystem: String,
21 pub category: String,
22 pub message_type: String,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub event_type: Option<String>,
25 pub sender_image_path: String,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub format_string: Option<String>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub name: Option<String>,
30 pub rendered_message: String,
31}
32
33#[derive(Debug, Clone)]
34struct ActivityTraceTable {
35 unknown0: Bytes,
36 unknown2: Bytes,
37 name: String,
38 columns: Vec<String>,
39}
40
41#[derive(Default)]
42pub struct ActivityTraceDecoder {
43 stack: Vec<ActivityTraceValue>,
44 generation: u32,
45 background: u32,
46 tables: Vec<ActivityTraceTable>,
47}
48
49pub struct ActivityTraceClient<S> {
50 tap: TapClient<S>,
51 decoder: ActivityTraceDecoder,
52 pending: VecDeque<ActivityTraceEntry>,
53 pid_filter: Option<u32>,
54}
55
56impl ActivityTraceDecoder {
57 pub fn decode_message(&mut self, message: &[u8]) -> Result<Vec<ActivityTraceEntry>, DtxError> {
58 if message.is_empty() || message.starts_with(b"bplist") {
59 return Ok(vec![]);
60 }
61
62 let mut entries = Vec::new();
63 let mut cursor = 0usize;
64 while cursor + 2 <= message.len() {
65 let word = read_word(message, &mut cursor)?;
66 let opcode = word >> 8;
67 let result = match opcode {
68 CMD_TABLE_RESET => {
69 self.handle_table_reset();
70 None
71 }
72 CMD_SENTINEL => {
73 self.stack.push(ActivityTraceValue::Null);
74 None
75 }
76 CMD_STRUCT => {
77 self.handle_struct((word & 0xFF) as u8)?;
78 None
79 }
80 CMD_DEFINE_TABLE => {
81 self.handle_define_table()?;
82 None
83 }
84 CMD_DEBUG => {
85 self.handle_debug(word)?;
86 None
87 }
88 CMD_COPY => {
89 self.handle_copy(word)?;
90 None
91 }
92 CMD_END_ROW => self.handle_end_row((word & 0xFF) as usize)?,
93 CMD_PLACEHOLDER_COUNT => {
94 self.handle_placeholder_count((word & 0xFF) as usize);
95 None
96 }
97 CMD_CONVERT_MACH_CONTINUOUS => None,
98 _ => {
99 self.stack.push(handle_push(message, &mut cursor, word)?);
100 None
101 }
102 };
103
104 if let Some(entry) = result {
105 entries.push(entry);
106 }
107 }
108
109 Ok(entries)
110 }
111
112 fn handle_table_reset(&mut self) {
113 self.generation += 1;
114 self.background = 0;
115 self.stack.clear();
116 }
117
118 fn handle_struct(&mut self, distance: u8) -> Result<(), DtxError> {
119 if distance == 0xFF {
120 return Err(DtxError::Protocol(
121 "activity trace long struct is not implemented".into(),
122 ));
123 }
124 let distance = usize::from(distance);
125 let item = ActivityTraceValue::Struct(self.pop_n(distance)?);
126 self.stack.push(item);
127 Ok(())
128 }
129
130 fn handle_define_table(&mut self) -> Result<(), DtxError> {
131 let items = self.pop_n(4)?;
132 let [unknown0, unknown2, name, columns] = items.try_into().map_err(|_| {
133 DtxError::Protocol("activity trace table definition did not contain four items".into())
134 })?;
135
136 let name = decode_trace_string(&name).unwrap_or_default();
137 let columns = match columns {
138 ActivityTraceValue::Struct(items) => items
139 .into_iter()
140 .filter_map(|item| decode_trace_string(&item))
141 .collect(),
142 _ => {
143 return Err(DtxError::Protocol(
144 "activity trace table columns were not a struct".into(),
145 ))
146 }
147 };
148
149 self.tables.push(ActivityTraceTable {
150 unknown0: into_bytes(unknown0),
151 unknown2: into_bytes(unknown2),
152 name,
153 columns,
154 });
155 Ok(())
156 }
157
158 fn handle_debug(&mut self, word: u16) -> Result<(), DtxError> {
159 let debug_id = word & 0xFF;
160 let Some(item) = self.stack.last() else {
161 return Err(DtxError::Protocol(format!(
162 "activity trace debug opcode {debug_id:#x} with empty stack"
163 )));
164 };
165 let reference = trace_value_to_u64(item)? as usize;
166 if reference != self.stack.len().saturating_sub(1) {
167 return Err(DtxError::Protocol(format!(
168 "activity trace debug reference mismatch: got {reference}, expected {}",
169 self.stack.len().saturating_sub(1)
170 )));
171 }
172 self.stack.pop();
173 Ok(())
174 }
175
176 fn handle_copy(&mut self, word: u16) -> Result<(), DtxError> {
177 let distance = usize::from((word & 0xFF) as u8);
178 if distance != 0xFF {
179 let index = self.stack.len().checked_sub(distance + 1).ok_or_else(|| {
180 DtxError::Protocol(format!(
181 "activity trace copy distance {distance} exceeds stack size {}",
182 self.stack.len()
183 ))
184 })?;
185 let item = self.stack[index].clone();
186 self.stack.push(item);
187 return Ok(());
188 }
189
190 let item = self.stack.pop().ok_or_else(|| {
191 DtxError::Protocol("activity trace long copy missing reference".into())
192 })?;
193 let reference = trace_value_to_u64(&item)?.checked_sub(1).ok_or_else(|| {
194 DtxError::Protocol("activity trace long copy reference underflow".into())
195 })? as usize;
196 let cloned = self.stack.get(reference).cloned().ok_or_else(|| {
197 DtxError::Protocol(format!(
198 "activity trace long copy reference {reference} out of bounds"
199 ))
200 })?;
201 self.stack.push(cloned);
202 Ok(())
203 }
204
205 fn handle_end_row(
206 &mut self,
207 generation: usize,
208 ) -> Result<Option<ActivityTraceEntry>, DtxError> {
209 let table = self
210 .tables
211 .get(generation)
212 .ok_or_else(|| {
213 DtxError::Protocol(format!(
214 "activity trace row references missing table generation {generation}"
215 ))
216 })?
217 .clone();
218 let _ = (&table.unknown0, &table.unknown2, &table.name);
219 let row = self.pop_n(table.columns.len())?;
220 let fields: std::collections::HashMap<_, _> =
221 table.columns.iter().cloned().zip(row).collect();
222
223 let Some(message_value) = fields.get("message") else {
224 return Ok(None);
225 };
226
227 let process = fields
228 .get("process")
229 .and_then(|value| trace_value_to_u32(value).ok())
230 .unwrap_or(0);
231 let thread = fields
232 .get("thread")
233 .and_then(|value| trace_value_to_u32(value).ok())
234 .unwrap_or(0);
235 let subsystem = fields
236 .get("subsystem")
237 .and_then(decode_trace_string)
238 .unwrap_or_default();
239 let category = fields
240 .get("category")
241 .and_then(decode_trace_string)
242 .unwrap_or_default();
243 let message_type = fields
244 .get("message_type")
245 .and_then(decode_trace_string)
246 .or_else(|| fields.get("event_type").and_then(decode_trace_string))
247 .unwrap_or_else(|| "unknown".to_string());
248 let event_type = fields.get("event_type").and_then(decode_trace_string);
249 let sender_image_path = fields
250 .get("sender_image_path")
251 .and_then(decode_trace_string)
252 .unwrap_or_default();
253 let format_string = fields.get("format_string").and_then(decode_trace_string);
254 let name = fields.get("name").and_then(decode_trace_string);
255 let rendered_message = match message_value {
256 ActivityTraceValue::Struct(items) => decode_message_format(items),
257 ActivityTraceValue::Null => name.clone().unwrap_or_default(),
258 _ => name.clone().unwrap_or_default(),
259 };
260
261 Ok(Some(ActivityTraceEntry {
262 process,
263 thread,
264 subsystem,
265 category,
266 message_type,
267 event_type,
268 sender_image_path,
269 format_string,
270 name,
271 rendered_message,
272 }))
273 }
274
275 fn handle_placeholder_count(&mut self, count: usize) {
276 if count == 0 {
277 return;
278 }
279 let new_len = self.stack.len().saturating_sub(count);
280 self.stack.truncate(new_len);
281 }
282
283 fn pop_n(&mut self, count: usize) -> Result<Vec<ActivityTraceValue>, DtxError> {
284 if self.stack.len() < count {
285 return Err(DtxError::Protocol(format!(
286 "activity trace stack underflow: need {count}, have {}",
287 self.stack.len()
288 )));
289 }
290 Ok(self.stack.split_off(self.stack.len() - count))
291 }
292}
293
294impl<S: AsyncRead + AsyncWrite + Unpin + Send> ActivityTraceClient<S> {
295 pub async fn connect(stream: S, pid: Option<u32>, enable_har: bool) -> Result<Self, DtxError> {
296 let tap = TapClient::connect(
297 stream,
298 super::ACTIVITY_TRACE_TAP_SVC,
299 activity_trace_config(pid, enable_har),
300 )
301 .await?;
302 Ok(Self {
303 tap,
304 decoder: ActivityTraceDecoder::default(),
305 pending: VecDeque::new(),
306 pid_filter: pid,
307 })
308 }
309
310 pub async fn next_entry(&mut self) -> Result<ActivityTraceEntry, DtxError> {
311 loop {
312 if let Some(entry) = self.pending.pop_front() {
313 if self.pid_filter.map_or(true, |pid| pid == entry.process) {
314 return Ok(entry);
315 }
316 }
317
318 match self.tap.next_message().await? {
319 TapMessage::Data(bytes) => match self.decoder.decode_message(&bytes) {
320 Ok(entries) => self.pending.extend(entries),
321 Err(error) => {
322 tracing::debug!("skipping undecodable activity trace frame: {error}");
323 }
324 },
325 TapMessage::Plist(_) => {}
326 }
327 }
328 }
329
330 pub async fn stop(&mut self) -> Result<(), DtxError> {
331 self.tap.stop().await
332 }
333}
334
335pub fn decode_message_format(message: &[ActivityTraceValue]) -> String {
336 let mut output = String::new();
337 for item in message {
338 let ActivityTraceValue::Struct(parts) = item else {
339 output.push_str(&format_trace_value(item));
340 continue;
341 };
342 if parts.len() != 2 {
343 output.push_str(&format_trace_value(item));
344 continue;
345 }
346
347 let Some(type_name) = decode_trace_string(&parts[0]) else {
348 output.push_str(&format_trace_value(item));
349 continue;
350 };
351 let normalized = if type_name == "address" {
352 "uint64-hex".to_string()
353 } else {
354 type_name
355 };
356
357 match normalized.as_str() {
358 "narrative-text" | "string" => match decode_trace_string(&parts[1]) {
359 Some(value) => output.push_str(&value),
360 None if matches!(parts[1], ActivityTraceValue::Null) => output.push_str("<None>"),
361 None => output.push_str(&format_trace_value(&parts[1])),
362 },
363 "private" => output.push_str("<private>"),
364 ty if ty.starts_with("uint64") => {
365 let value = trace_value_to_u64(&parts[1]).unwrap_or_default();
366 if ty.contains("hex") {
367 let rendered = if ty.contains("lowercase") {
368 format!("{value:x}")
369 } else {
370 format!("{value:X}")
371 };
372 output.push_str(&rendered);
373 } else {
374 output.push_str(&value.to_string());
375 }
376 }
377 ty if ty.contains("decimal") => {
378 output.push_str(
379 &trace_value_to_u64(&parts[1])
380 .unwrap_or_default()
381 .to_string(),
382 );
383 }
384 "data" | "uuid" => {
385 if let Some(bytes) = flatten_data_value(&parts[1]) {
386 output.push_str(&hex_string(&bytes));
387 }
388 }
389 _ => output.push_str(&format_trace_value(&parts[1])),
390 }
391 }
392 output
393}
394
395const CMD_DEFINE_TABLE: u16 = 1;
396const CMD_END_ROW: u16 = 2;
397const CMD_CONVERT_MACH_CONTINUOUS: u16 = 5;
398const CMD_TABLE_RESET: u16 = 0x64;
399const CMD_COPY: u16 = 0x65;
400const CMD_SENTINEL: u16 = 0x68;
401const CMD_STRUCT: u16 = 0x69;
402const CMD_PLACEHOLDER_COUNT: u16 = 0x6A;
403const CMD_DEBUG: u16 = 0x6B;
404
405fn activity_trace_config(pid: Option<u32>, enable_har: bool) -> Vec<(String, plist::Value)> {
406 let _ = pid;
407 vec![
408 ("bm".to_string(), plist::Value::Integer(0.into())),
409 (
410 "combineDataScope".to_string(),
411 plist::Value::Integer(0.into()),
412 ),
413 (
414 "machTimebaseDenom".to_string(),
415 plist::Value::Integer(3.into()),
416 ),
417 (
418 "machTimebaseNumer".to_string(),
419 plist::Value::Integer(125.into()),
420 ),
421 (
422 "onlySignposts".to_string(),
423 plist::Value::Integer(0.into()),
424 ),
425 (
426 "pidToInjectCombineDYLIB".to_string(),
427 plist::Value::String("-1".to_string()),
428 ),
429 (
430 "predicate".to_string(),
431 plist::Value::String(
432 "(messageType == info OR messageType == debug OR messageType == default OR messageType == error OR messageType == fault)"
433 .to_string(),
434 ),
435 ),
436 (
437 "signpostsAndLogs".to_string(),
438 plist::Value::Integer(1.into()),
439 ),
440 (
441 "trackPidToExecNameMapping".to_string(),
442 plist::Value::Boolean(true),
443 ),
444 (
445 "enableHTTPArchiveLogging".to_string(),
446 plist::Value::Boolean(enable_har),
447 ),
448 (
449 "targetPID".to_string(),
450 plist::Value::Integer((-3).into()),
451 ),
452 (
453 "trackExpiredPIDs".to_string(),
454 plist::Value::Integer(1.into()),
455 ),
456 ("ur".to_string(), plist::Value::Integer(500.into())),
457 ]
458}
459
460fn read_word(message: &[u8], cursor: &mut usize) -> Result<u16, DtxError> {
461 if *cursor + 2 > message.len() {
462 return Err(DtxError::Protocol(
463 "activity trace message ended mid-word".into(),
464 ));
465 }
466 let word = u16::from_le_bytes([message[*cursor], message[*cursor + 1]]);
467 *cursor += 2;
468 Ok(word)
469}
470
471fn handle_push(
472 message: &[u8],
473 cursor: &mut usize,
474 mut word: u16,
475) -> Result<ActivityTraceValue, DtxError> {
476 let class = word >> 14;
477 if class != 0b10 && class != 0b11 {
478 return Err(DtxError::Protocol(format!(
479 "invalid activity trace push word {word:#06x}"
480 )));
481 }
482
483 let mut chunks = Vec::new();
484 loop {
485 chunks.push(word & 0x3FFF);
486 if word >> 14 == 0b11 {
487 break;
488 }
489 word = read_word(message, cursor)?;
490 }
491
492 let bit_count = chunks.len() * 14;
493 let padding = 8 - (bit_count % 8);
494 let total_bits = bit_count + padding;
495 let mut bytes = vec![0u8; total_bits / 8];
496 let mut bit_pos = 0usize;
497 for chunk in chunks {
498 for shift in (0..14).rev() {
499 let bit = ((chunk >> shift) & 1) as u8;
500 if bit != 0 {
501 let byte_index = bit_pos / 8;
502 let bit_index = 7 - (bit_pos % 8);
503 bytes[byte_index] |= bit << bit_index;
504 }
505 bit_pos += 1;
506 }
507 }
508
509 Ok(ActivityTraceValue::Bytes(Bytes::from(bytes)))
510}
511
512fn decode_trace_string(value: &ActivityTraceValue) -> Option<String> {
513 let bytes = match value {
514 ActivityTraceValue::Bytes(bytes) => bytes.as_ref(),
515 ActivityTraceValue::Struct(items) => {
516 return items.first().and_then(decode_trace_string);
517 }
518 _ => return None,
519 };
520 let trimmed = bytes.split(|byte| *byte == 0).next().unwrap_or_default();
521 Some(String::from_utf8_lossy(trimmed).into_owned())
522}
523
524fn trace_value_to_u32(value: &ActivityTraceValue) -> Result<u32, DtxError> {
525 let bytes = trace_value_leaf_bytes(value)
526 .ok_or_else(|| DtxError::Protocol("activity trace expected byte payload for u32".into()))?;
527 let mut buf = [0u8; 4];
528 let count = bytes.len().min(4);
529 buf[..count].copy_from_slice(&bytes[..count]);
530 Ok(u32::from_le_bytes(buf))
531}
532
533fn trace_value_to_u64(value: &ActivityTraceValue) -> Result<u64, DtxError> {
534 let bytes = trace_value_leaf_bytes(value)
535 .ok_or_else(|| DtxError::Protocol("activity trace expected byte payload for u64".into()))?;
536 let mut buf = [0u8; 8];
537 let count = bytes.len().min(8);
538 buf[..count].copy_from_slice(&bytes[..count]);
539 Ok(u64::from_le_bytes(buf))
540}
541
542fn trace_value_leaf_bytes(value: &ActivityTraceValue) -> Option<&[u8]> {
543 match value {
544 ActivityTraceValue::Bytes(bytes) => Some(bytes),
545 ActivityTraceValue::Struct(items) => items.first().and_then(trace_value_leaf_bytes),
546 _ => None,
547 }
548}
549
550fn flatten_data_value(value: &ActivityTraceValue) -> Option<Vec<u8>> {
551 match value {
552 ActivityTraceValue::Bytes(bytes) => Some(strip_one_trailing_null(bytes).to_vec()),
553 ActivityTraceValue::Struct(items) => {
554 let mut out = Vec::new();
555 for item in items {
556 out.extend(flatten_data_value(item)?);
557 }
558 Some(out)
559 }
560 ActivityTraceValue::Null => None,
561 }
562}
563
564fn format_trace_value(value: &ActivityTraceValue) -> String {
565 match value {
566 ActivityTraceValue::Bytes(bytes) => {
567 let trimmed = bytes.split(|byte| *byte == 0).next().unwrap_or_default();
568 String::from_utf8_lossy(trimmed).into_owned()
569 }
570 ActivityTraceValue::Null => "<None>".to_string(),
571 ActivityTraceValue::Struct(items) => format!("{items:?}"),
572 }
573}
574
575fn hex_string(bytes: &[u8]) -> String {
576 bytes.iter().map(|byte| format!("{byte:02x}")).collect()
577}
578
579fn strip_one_trailing_null(bytes: &[u8]) -> &[u8] {
580 if bytes.last() == Some(&0) {
581 &bytes[..bytes.len() - 1]
582 } else {
583 bytes
584 }
585}
586
587fn into_bytes(value: ActivityTraceValue) -> Bytes {
588 match value {
589 ActivityTraceValue::Bytes(bytes) => bytes,
590 ActivityTraceValue::Null => Bytes::new(),
591 ActivityTraceValue::Struct(_) => Bytes::new(),
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598
599 const CMD_DEFINE_TABLE: u16 = 1;
600 const CMD_END_ROW: u16 = 2;
601 const CMD_TABLE_RESET: u16 = 0x64;
602 const CMD_STRUCT: u16 = 0x69;
603
604 #[test]
605 fn decodes_message_format() {
606 let message = vec![ActivityTraceValue::Struct(vec![
607 ActivityTraceValue::Bytes(Bytes::from_static(b"string\0\0")),
608 ActivityTraceValue::Bytes(Bytes::from_static(b"hello\0\0\0")),
609 ])];
610
611 assert_eq!(decode_message_format(&message), "hello");
612 }
613
614 #[test]
615 fn decodes_activity_trace_row() {
616 let mut decoder = ActivityTraceDecoder::default();
617 let payload = build_trace_message();
618 let entries = decoder.decode_message(&payload).unwrap();
619 assert_eq!(entries.len(), 1);
620
621 let entry = &entries[0];
622 assert_eq!(entry.process, 42);
623 assert_eq!(entry.thread, 7);
624 assert_eq!(entry.subsystem, "sub");
625 assert_eq!(entry.category, "cat");
626 assert_eq!(entry.message_type, "info");
627 assert_eq!(entry.sender_image_path, "app");
628 assert_eq!(entry.rendered_message, "hello");
629 }
630
631 fn build_trace_message() -> Vec<u8> {
632 let mut words = Vec::new();
633
634 words.push(opcode(CMD_TABLE_RESET, 0));
635
636 words.extend(push_bytes(&[0]));
637 words.extend(push_bytes(&[0]));
638 words.extend(push_string("logs"));
639
640 for column in [
641 "process",
642 "thread",
643 "subsystem",
644 "category",
645 "message_type",
646 "sender_image_path",
647 "message",
648 ] {
649 words.extend(push_string(column));
650 }
651 words.push(opcode(CMD_STRUCT, 7));
652 words.push(opcode(CMD_DEFINE_TABLE, 0));
653
654 words.extend(push_u32(42));
655 words.push(opcode(CMD_STRUCT, 1));
656 words.extend(push_u32(7));
657 words.push(opcode(CMD_STRUCT, 1));
658 words.extend(push_string("sub"));
659 words.extend(push_string("cat"));
660 words.extend(push_string("info"));
661 words.extend(push_string("app"));
662 words.extend(push_string("string"));
663 words.extend(push_string("hello"));
664 words.push(opcode(CMD_STRUCT, 2));
665 words.push(opcode(CMD_STRUCT, 1));
666 words.push(opcode(CMD_END_ROW, 0));
667
668 words
669 .into_iter()
670 .flat_map(u16::to_le_bytes)
671 .collect::<Vec<_>>()
672 }
673
674 fn push_string(value: &str) -> Vec<u16> {
675 let mut bytes = value.as_bytes().to_vec();
676 bytes.push(0);
677 push_bytes(&bytes)
678 }
679
680 fn push_u32(value: u32) -> Vec<u16> {
681 let mut bytes = vec![0u8; 4];
682 bytes[..4].copy_from_slice(&value.to_le_bytes());
683 push_bytes(&bytes)
684 }
685
686 fn push_bytes(bytes: &[u8]) -> Vec<u16> {
687 let group_count = bytes.len().div_ceil(7).max(1);
688 let total_data_bytes = group_count * 7;
689 let total_words = group_count * 4;
690 let mut data = vec![0u8; total_data_bytes];
691 data[..bytes.len()].copy_from_slice(bytes);
692
693 let mut bit_pos = 0usize;
694 let mut words = Vec::with_capacity(total_words);
695 for index in 0..total_words {
696 let mut chunk = 0u16;
697 for _ in 0..14 {
698 let bit = (data[bit_pos / 8] >> (7 - (bit_pos % 8))) & 1;
699 chunk = (chunk << 1) | u16::from(bit);
700 bit_pos += 1;
701 }
702 words.push(if index + 1 == total_words {
703 terminal_word(chunk)
704 } else {
705 continuation_word(chunk)
706 });
707 }
708 words
709 }
710
711 fn continuation_word(value: u16) -> u16 {
712 (0b10 << 14) | (value & 0x3fff)
713 }
714
715 fn terminal_word(value: u16) -> u16 {
716 (0b11 << 14) | (value & 0x3fff)
717 }
718
719 fn opcode(opcode: u16, arg: u8) -> u16 {
720 (opcode << 8) | u16::from(arg)
721 }
722}