Skip to main content

dbus_router/
session.rs

1//! Client session handling with dual upstream connections and routing
2
3use crate::auth;
4pub use crate::bus::Bus;
5use crate::config::Config;
6use crate::conn::connect_dbus;
7use crate::dbus::daemon::{
8    build_list_names_response, merge_list_names, needs_request_rewrite, needs_response_rewrite,
9    parse_string_array, rewrite_match_rule_body, rewrite_name_owner_changed,
10    rewrite_single_name_response, rewrite_string_array_response, rewrite_unique_name_request,
11    signal_needs_rewrite,
12};
13use crate::dbus::format::format_message;
14use crate::dbus::message::{self, read_message, Message, MessageType};
15use crate::dbus::rewrite::{rewrite_message_header, RewriteDirection};
16use crate::error::{Error, Result};
17use crate::routing::{DefaultRouting, RouteDecision, RoutingStrategy};
18use std::collections::{HashMap, HashSet};
19use std::path::PathBuf;
20use std::sync::Arc;
21use tokio::io::{AsyncWrite, AsyncWriteExt};
22use tokio::net::UnixStream;
23
24/// Log a message with all relevant fields for debugging.
25fn log_message(msg: &Message, direction: &str, target: Option<Bus>) {
26    let target_str = match target {
27        Some(Bus::Host) => " -> Host",
28        Some(Bus::Sandbox) => " -> Sandbox",
29        None => "",
30    };
31
32    tracing::trace!(
33        direction = direction,
34        target = target_str,
35        msg_type = ?msg.header.msg_type,
36        serial = msg.header.serial,
37        reply_serial = ?msg.header.reply_serial,
38        sender = ?msg.header.sender,
39        destination = ?msg.header.destination,
40        interface = ?msg.header.interface,
41        member = ?msg.header.member,
42        body_len = msg.header.body_len,
43        "{}",
44        format_message_summary(msg)
45    );
46
47    // Also emit dbus-monitor style log with separate target for easy filtering
48    tracing::trace!(
49        target: "dbus_monitor",
50        "{}",
51        format_message(msg, direction, target)
52    );
53}
54
55/// Format a one-line summary of the message for logging.
56fn format_message_summary(msg: &Message) -> String {
57    let type_str = match msg.header.msg_type {
58        MessageType::MethodCall => "CALL",
59        MessageType::MethodReturn => "REPLY",
60        MessageType::Error => "ERROR",
61        MessageType::Signal => "SIGNAL",
62        MessageType::Invalid => "INVALID",
63    };
64
65    let interface = msg.header.interface.as_deref().unwrap_or("-");
66    let member = msg.header.member.as_deref().unwrap_or("-");
67    let dest = msg.header.destination.as_deref().unwrap_or("-");
68    let sender = msg.header.sender.as_deref().unwrap_or("-");
69
70    format!(
71        "{} {}.{} [{}->{}] serial={}",
72        type_str, interface, member, sender, dest, msg.header.serial
73    )
74}
75
76/// Prepare a message from an upstream bus for forwarding to the client.
77/// Rewrites sender header and body as needed.
78fn prepare_message_for_client(
79    msg: &Message,
80    source_bus: Bus,
81    pending_calls: &HashMap<u32, PendingCallInfo>,
82) -> Vec<u8> {
83    let mut msg_for_client = msg.clone();
84
85    // Log message context for debugging header rewriting issues
86    tracing::trace!(
87        msg_type = ?msg.header.msg_type,
88        sender = ?msg.header.sender,
89        destination = ?msg.header.destination,
90        interface = ?msg.header.interface,
91        member = ?msg.header.member,
92        source_bus = ?source_bus,
93        "Preparing message for client"
94    );
95
96    // Rewrite sender header to add bus prefix
97    if let Err(e) =
98        rewrite_message_header(&mut msg_for_client, RewriteDirection::ToClient, source_bus)
99    {
100        tracing::warn!(
101            error = %e,
102            msg_type = ?msg.header.msg_type,
103            sender = ?msg.header.sender,
104            destination = ?msg.header.destination,
105            interface = ?msg.header.interface,
106            member = ?msg.header.member,
107            "Failed to rewrite sender header"
108        );
109    }
110
111    // Handle body rewriting based on message type
112    match msg_for_client.header.msg_type {
113        MessageType::MethodReturn => {
114            rewrite_method_return_body(&msg_for_client, source_bus, pending_calls)
115        }
116        MessageType::Signal => rewrite_signal_body(&msg_for_client, source_bus),
117        _ => msg_for_client.raw,
118    }
119}
120
121/// Rewrite method return body if it contains unique names that need prefixing.
122fn rewrite_method_return_body(
123    msg: &Message,
124    source_bus: Bus,
125    pending_calls: &HashMap<u32, PendingCallInfo>,
126) -> Vec<u8> {
127    let Some(reply_serial) = msg.header.reply_serial else {
128        return msg.raw.clone();
129    };
130
131    let Some(call_info) = pending_calls.get(&reply_serial) else {
132        return msg.raw.clone();
133    };
134
135    if call_info.bus != source_bus {
136        return msg.raw.clone();
137    }
138
139    let Some(ref member) = call_info.member else {
140        return msg.raw.clone();
141    };
142
143    if !needs_response_rewrite(member) {
144        return msg.raw.clone();
145    }
146
147    // ListQueuedOwners returns an array of unique names
148    let result = if member == "ListQueuedOwners" {
149        rewrite_string_array_response(msg, source_bus)
150    } else {
151        rewrite_single_name_response(msg, source_bus)
152    };
153
154    match result {
155        Ok(rewritten) => {
156            tracing::trace!(member = member, bus = ?source_bus, "Rewrote response body");
157            rewritten
158        }
159        Err(e) => {
160            tracing::warn!(member = member, error = %e, "Failed to rewrite response body");
161            msg.raw.clone()
162        }
163    }
164}
165
166/// Rewrite signal body if it contains unique names that need prefixing.
167fn rewrite_signal_body(msg: &Message, source_bus: Bus) -> Vec<u8> {
168    let Some(ref member) = msg.header.member else {
169        return msg.raw.clone();
170    };
171
172    if !signal_needs_rewrite(member) {
173        return msg.raw.clone();
174    }
175
176    match rewrite_name_owner_changed(msg, source_bus) {
177        Ok(rewritten) => {
178            tracing::trace!(member = member, bus = ?source_bus, "Rewrote signal body");
179            rewritten
180        }
181        Err(e) => {
182            tracing::warn!(member = member, error = %e, "Failed to rewrite signal body");
183            msg.raw.clone()
184        }
185    }
186}
187
188/// Result of processing a merge response.
189enum MergeResult {
190    /// First response stored, waiting for second
191    Stored,
192    /// Second response received, returns merged message bytes
193    Complete(Vec<u8>),
194}
195
196/// Process a ListNames/ListActivatableNames merge response.
197/// Returns `Some(MergeResult)` if the message was a merge response, `None` otherwise.
198fn process_merge_response(
199    msg: &Message,
200    source_bus: Bus,
201    pending_merges: &mut HashMap<u32, PendingMerge>,
202) -> Option<MergeResult> {
203    let reply_serial = msg.header.reply_serial?;
204    let pending = pending_merges.get_mut(&reply_serial)?;
205
206    let names = parse_string_array(msg).unwrap_or_default();
207    tracing::trace!(
208        serial = reply_serial,
209        count = names.len(),
210        bus = ?source_bus,
211        "Received ListNames response"
212    );
213
214    let Some((first_bus, first_names)) = pending.first_response.take() else {
215        // First response - store it
216        pending.first_response = Some((source_bus, names));
217        return Some(MergeResult::Stored);
218    };
219
220    // Second response - merge and build final message
221    let (host_names, sandbox_names) = if first_bus == Bus::Host {
222        (first_names, names)
223    } else {
224        (names, first_names)
225    };
226
227    let merged = merge_list_names(host_names, sandbox_names);
228    tracing::trace!(
229        serial = reply_serial,
230        count = merged.len(),
231        "Merged ListNames response"
232    );
233
234    let pending = pending_merges.remove(&reply_serial).unwrap();
235    let response = match build_list_names_response(&pending.original_request, merged) {
236        Ok(response) => response,
237        Err(e) => {
238            tracing::warn!(error = %e, "Failed to build merged response");
239            msg.raw.clone()
240        }
241    };
242
243    Some(MergeResult::Complete(response))
244}
245
246/// Pending merge state for ListNames/ListActivatableNames
247#[derive(Debug)]
248struct PendingMerge {
249    /// The original request message (for building response)
250    original_request: Message,
251    /// First response received (None if waiting for first)
252    first_response: Option<(Bus, Vec<String>)>,
253}
254
255/// Info about a pending call (for response rewriting)
256#[derive(Debug, Clone)]
257struct PendingCallInfo {
258    /// Which bus the call was sent to
259    bus: Bus,
260    /// The member (method name) of the call (for response rewriting)
261    member: Option<String>,
262}
263
264/// A client session with connections to both upstream buses.
265pub struct Session {
266    /// Connection from sandbox app
267    client: UnixStream,
268    /// Connection to host session bus
269    host_bus: UnixStream,
270    /// Connection to sandbox session bus (default target)
271    sandbox_bus: UnixStream,
272    /// Routing configuration
273    config: Arc<Config>,
274    /// Track which bus each outgoing serial was sent to (for routing replies)
275    pending_calls: HashMap<u32, PendingCallInfo>,
276    /// Client process executable path (for sandbox export permission check)
277    client_exe_path: Option<PathBuf>,
278    /// Services exported by this client to the host bus
279    exported_services: HashSet<String>,
280    /// Services registered by this client on the sandbox bus
281    sandbox_services: HashSet<String>,
282    /// Track incoming calls from upstream buses (serial -> source bus)
283    incoming_calls: HashMap<u32, Bus>,
284    /// Pending ListNames/ListActivatableNames merges (serial -> state)
285    pending_merges: HashMap<u32, PendingMerge>,
286}
287
288/// Get the executable path of a peer process from a Unix socket.
289#[cfg(target_os = "linux")]
290fn get_peer_exe_path(stream: &UnixStream) -> Option<PathBuf> {
291    use std::os::unix::io::AsRawFd;
292
293    let fd = stream.as_raw_fd();
294
295    // Get peer credentials using SO_PEERCRED
296    let mut ucred: libc::ucred = unsafe { std::mem::zeroed() };
297    let mut len = std::mem::size_of::<libc::ucred>() as libc::socklen_t;
298
299    let ret = unsafe {
300        libc::getsockopt(
301            fd,
302            libc::SOL_SOCKET,
303            libc::SO_PEERCRED,
304            &mut ucred as *mut _ as *mut libc::c_void,
305            &mut len,
306        )
307    };
308
309    if ret != 0 {
310        tracing::debug!(
311            "Failed to get peer credentials: {}",
312            std::io::Error::last_os_error()
313        );
314        return None;
315    }
316
317    let pid = ucred.pid;
318    if pid <= 0 {
319        return None;
320    }
321
322    // Read /proc/{pid}/exe symlink
323    let exe_path = format!("/proc/{}/exe", pid);
324    match std::fs::read_link(&exe_path) {
325        Ok(path) => {
326            tracing::debug!(pid = pid, exe = %path.display(), "Got peer exe path");
327            Some(path)
328        }
329        Err(e) => {
330            tracing::debug!(pid = pid, error = %e, "Failed to read exe path");
331            None
332        }
333    }
334}
335
336#[cfg(not(target_os = "linux"))]
337fn get_peer_exe_path(_stream: &UnixStream) -> Option<PathBuf> {
338    None
339}
340
341struct UpstreamContext<'a> {
342    pending_merges: &'a mut HashMap<u32, PendingMerge>,
343    pending_calls: &'a HashMap<u32, PendingCallInfo>,
344    incoming_calls: &'a mut HashMap<u32, Bus>,
345    exported_services: &'a HashSet<String>,
346    sandbox_services: &'a HashSet<String>,
347}
348
349/// Handle a message from an upstream bus and forward to the client.
350async fn handle_upstream_message(
351    msg: Message,
352    source_bus: Bus,
353    client_write: &mut (impl AsyncWrite + Unpin),
354    ctx: &mut UpstreamContext<'_>,
355) -> Result<()> {
356    let (service_label, default_label, rewrite_sender_for_service) = match source_bus {
357        Bus::Host => ("Host->Client(exported)", "Host->Client", false),
358        Bus::Sandbox => ("Sandbox->Client(service)", "Sandbox->Client", true),
359    };
360
361    let is_service_call = msg.header.msg_type == MessageType::MethodCall
362        && msg
363            .header
364            .destination
365            .as_deref()
366                .map(|dest| match source_bus {
367                Bus::Host => ctx.exported_services.contains(dest),
368                Bus::Sandbox => ctx.sandbox_services.contains(dest),
369            })
370            .unwrap_or(false);
371
372    if is_service_call {
373        let mut msg_for_client = msg;
374        log_message(&msg_for_client, service_label, None);
375        ctx.incoming_calls
376            .insert(msg_for_client.header.serial, source_bus);
377
378        if rewrite_sender_for_service {
379            if let Err(e) =
380                rewrite_message_header(&mut msg_for_client, RewriteDirection::ToClient, source_bus)
381            {
382                tracing::warn!(
383                    error = %e,
384                    "Failed to rewrite sender header for sandbox service call"
385                );
386            }
387        }
388
389        client_write.write_all(&msg_for_client.raw).await?;
390        return Ok(());
391    }
392
393    if let Some(result) = process_merge_response(&msg, source_bus, ctx.pending_merges) {
394        if let MergeResult::Complete(response) = result {
395            client_write.write_all(&response).await?;
396        }
397        return Ok(());
398    }
399
400    let msg_to_send = prepare_message_for_client(&msg, source_bus, ctx.pending_calls);
401    log_message(&msg, default_label, None);
402    client_write.write_all(&msg_to_send).await?;
403    Ok(())
404}
405
406impl Session {
407    /// Create a new session with connections to both upstream buses.
408    pub async fn new(
409        client: UnixStream,
410        host_addr: &str,
411        sandbox_addr: &str,
412        config: Arc<Config>,
413    ) -> Result<Self> {
414        let client_exe_path = get_peer_exe_path(&client);
415        let host_bus = connect_dbus(host_addr).await?;
416        let sandbox_bus = connect_dbus(sandbox_addr).await?;
417
418        Ok(Self {
419            client,
420            host_bus,
421            sandbox_bus,
422            config,
423            pending_calls: HashMap::new(),
424            client_exe_path,
425            exported_services: HashSet::new(),
426            sandbox_services: HashSet::new(),
427            incoming_calls: HashMap::new(),
428            pending_merges: HashMap::new(),
429        })
430    }
431
432    /// Run the session: authenticate with both buses, then forward messages.
433    pub async fn run(mut self) -> Result<()> {
434        // Check if this is a hostpass client
435        let is_hostpass = self
436            .client_exe_path
437            .as_ref()
438            .map(|p| self.config.has_hostpass(p))
439            .unwrap_or(false);
440
441        // Phase 1: Auth passthrough with sandbox bus
442        tracing::debug!("Starting auth phase with sandbox bus");
443        auth::auth_passthrough(&mut self.client, &mut self.sandbox_bus).await?;
444        tracing::info!("Auth with sandbox bus completed");
445
446        // Phase 1b: Also authenticate with host bus (using same credentials)
447        // For hostpass clients, skip Hello() - their Hello() will be forwarded to host bus
448        // For non-hostpass clients, send Hello() so host_routes messages can be routed
449        tracing::debug!("Starting auth phase with host bus");
450        self.auth_host_bus(is_hostpass).await?;
451        tracing::info!("Auth with host bus completed, starting message forwarding");
452
453        // Phase 2: Message forwarding with routing
454        self.forward_loop().await
455    }
456
457    /// Authenticate with the host bus.
458    /// The host bus needs its own auth handshake.
459    /// If `skip_hello` is true, skip the Hello() call (for hostpass clients whose
460    /// Hello() will be forwarded to the host bus).
461    async fn auth_host_bus(&mut self, skip_hello: bool) -> Result<()> {
462        // Send null byte and EXTERNAL auth with hex-encoded UID
463        self.host_bus.write_all(&[0]).await?;
464        let uid = unsafe { libc::getuid() };
465        let uid_hex: String = uid
466            .to_string()
467            .bytes()
468            .map(|b| format!("{:02x}", b))
469            .collect();
470        self.host_bus
471            .write_all(format!("AUTH EXTERNAL {}\r\n", uid_hex).as_bytes())
472            .await?;
473
474        // Read auth response
475        let response = auth::read_auth_line_string(&mut self.host_bus).await?;
476        if !response.starts_with("OK") {
477            return Err(Error::Auth(format!(
478                "Host bus auth failed: {}",
479                response.trim()
480            )));
481        }
482
483        // Negotiate UNIX FD passing
484        self.host_bus.write_all(b"NEGOTIATE_UNIX_FD\r\n").await?;
485        let response = auth::read_auth_line_string(&mut self.host_bus).await?;
486        tracing::debug!(response = %response.trim(), "Host bus NEGOTIATE_UNIX_FD response");
487
488        // Send BEGIN to complete SASL auth
489        self.host_bus.write_all(b"BEGIN\r\n").await?;
490
491        if skip_hello {
492            tracing::debug!("Skipping Hello() for hostpass client");
493            Ok(())
494        } else {
495            self.send_host_hello().await
496        }
497    }
498
499    /// Send Hello() method call to host bus and read the response.
500    /// This registers the router's connection with the host bus daemon.
501    async fn send_host_hello(&mut self) -> Result<()> {
502        use zvariant::{serialized::Context, to_bytes, ObjectPath, Value, LE};
503
504        // Build header fields array for Hello() call
505        let path = ObjectPath::try_from("/org/freedesktop/DBus").unwrap();
506        let fields: Vec<(u8, Value)> = vec![
507            (1, Value::ObjectPath(path)),                   // PATH
508            (2, Value::Str("org.freedesktop.DBus".into())), // INTERFACE
509            (3, Value::Str("Hello".into())),                // MEMBER
510            (6, Value::Str("org.freedesktop.DBus".into())), // DESTINATION
511        ];
512
513        let ctxt = Context::new_dbus(LE, 12);
514        let fields_encoded = to_bytes(ctxt, &fields)?;
515        let array_len = fields_encoded.len() - 4; // Exclude 4-byte length prefix
516
517        // Calculate padding to 8-byte boundary
518        let header_end = 16 + array_len;
519        let padding = (8 - (header_end % 8)) % 8;
520
521        // Build D-Bus message: fixed header + fields + padding
522        let mut msg = Vec::with_capacity(16 + array_len + padding);
523        msg.extend_from_slice(&[b'l', 1, 0, 1]); // endian, method_call, flags, version
524        msg.extend_from_slice(&0u32.to_le_bytes()); // body length
525        msg.extend_from_slice(&1u32.to_le_bytes()); // serial
526        msg.extend_from_slice(&fields_encoded);
527        msg.resize(msg.len() + padding, 0);
528
529        self.host_bus.write_all(&msg).await?;
530
531        // Read and validate response.
532        // After Hello(), the bus daemon sends:
533        // 1. MethodReturn with our unique name
534        // 2. NameAcquired signal for that name
535        // We must consume both to prevent the signal from being forwarded to client,
536        // which would cause "Unexpected message Signal" errors in strict clients like zbus.
537        //
538        // First, read the MethodReturn
539        match message::read_message(&mut self.host_bus).await? {
540            Some(resp) if resp.header.msg_type == MessageType::MethodReturn => {
541                tracing::debug!("Host bus Hello() MethodReturn received");
542            }
543            Some(resp) if resp.header.msg_type == MessageType::Error => {
544                return Err(Error::Protocol(
545                    "Host bus Hello() failed with error".to_string(),
546                ));
547            }
548            Some(resp) => {
549                return Err(Error::Protocol(format!(
550                    "Unexpected response to Hello(): {:?}",
551                    resp.header.msg_type
552                )));
553            }
554            None => {
555                return Err(Error::Protocol(
556                    "Host bus disconnected after Hello()".to_string(),
557                ));
558            }
559        }
560
561        // Then, read the NameAcquired signal
562        match message::read_message(&mut self.host_bus).await? {
563            Some(resp) if resp.header.msg_type == MessageType::Signal => {
564                tracing::debug!(
565                    interface = ?resp.header.interface,
566                    member = ?resp.header.member,
567                    "Consumed NameAcquired signal after Hello()"
568                );
569            }
570            Some(resp) => {
571                tracing::warn!(
572                    msg_type = ?resp.header.msg_type,
573                    "Unexpected second message after Hello(), expected Signal"
574                );
575            }
576            None => {
577                return Err(Error::Protocol(
578                    "Host bus disconnected after Hello()".to_string(),
579                ));
580            }
581        }
582
583        tracing::debug!("Host bus Hello() completed");
584        Ok(())
585    }
586
587    /// Forward messages between client and upstream buses with routing.
588    async fn forward_loop(mut self) -> Result<()> {
589        // Check if this is a hostpass client (they only use host bus)
590        let is_hostpass = self
591            .client_exe_path
592            .as_ref()
593            .map(|p| self.config.has_hostpass(p))
594            .unwrap_or(false);
595
596        let (client_read, mut client_write) = self.client.split();
597        let (host_read, mut host_write) = self.host_bus.split();
598        let (sandbox_read, mut sandbox_write) = self.sandbox_bus.split();
599
600        let mut client_read = tokio::io::BufReader::new(client_read);
601        let mut host_read = tokio::io::BufReader::new(host_read);
602        let mut sandbox_read = tokio::io::BufReader::new(sandbox_read);
603
604        // Track if sandbox bus is still active (for hostpass clients that survive sandbox disconnect)
605        let mut sandbox_active = true;
606
607        let routing = DefaultRouting;
608        loop {
609            tokio::select! {
610                biased;
611                // Read from client and route to appropriate bus
612                result = read_message(&mut client_read) => {
613                    match result {
614                        Ok(Some(msg)) => {
615                            // Check if this is a reply to an incoming call from upstream bus
616                            if matches!(msg.header.msg_type, MessageType::MethodReturn | MessageType::Error) {
617                                if let Some(reply_serial) = msg.header.reply_serial {
618                                    if let Some(source_bus) = self.incoming_calls.remove(&reply_serial) {
619                                        // Rewrite destination from fake name to real name
620                                        let mut msg_for_bus = msg.clone();
621                                        if let Err(e) = rewrite_message_header(
622                                            &mut msg_for_bus,
623                                            RewriteDirection::ToUpstream,
624                                            source_bus,
625                                        ) {
626                                            tracing::warn!(error = %e, "Failed to rewrite destination for reply");
627                                        }
628
629                                        match source_bus {
630                                            Bus::Host => {
631                                                log_message(&msg, "Client->Host(reply)", Some(Bus::Host));
632                                                host_write.write_all(&msg_for_bus.raw).await?;
633                                            }
634                                            Bus::Sandbox => {
635                                                log_message(&msg, "Client->Sandbox(reply)", Some(Bus::Sandbox));
636                                                sandbox_write.write_all(&msg_for_bus.raw).await?;
637                                            }
638                                        }
639                                        continue;
640                                    }
641                                }
642                            }
643
644                            let decision = routing.route(
645                                &self.config,
646                                &msg,
647                                self.client_exe_path.as_deref(),
648                            );
649
650                            // Track RequestName calls to know which services this client exports
651                            if msg.is_request_name() {
652                                if let Some(name) = msg.extract_name_from_body() {
653                                    match decision {
654                                        RouteDecision::Single(Bus::Host) => {
655                                            tracing::info!(service = %name, "Client exporting service to host bus");
656                                            self.exported_services.insert(name);
657                                        }
658                                        RouteDecision::Single(Bus::Sandbox) => {
659                                            tracing::info!(service = %name, "Client registering service on sandbox bus");
660                                            self.sandbox_services.insert(name);
661                                        }
662                                        _ => {}
663                                    }
664                                }
665                            }
666
667                            match decision {
668                                RouteDecision::Single(target) => {
669                                    // Prepare message for sending to upstream
670                                    let mut msg_for_upstream = msg.clone();
671
672                                    // 1. Rewrite destination header if it's a fake unique name
673                                    if let Err(e) = rewrite_message_header(
674                                        &mut msg_for_upstream,
675                                        RewriteDirection::ToUpstream,
676                                        target, // source_bus not used for ToUpstream
677                                    ) {
678                                        tracing::warn!(error = %e, "Failed to rewrite destination header");
679                                    }
680
681                                    // 2. Rewrite body for org.freedesktop.DBus methods
682                                    let msg_to_send = if msg_for_upstream.header.destination.as_deref() == Some("org.freedesktop.DBus") {
683                                        if let Some(member) = msg_for_upstream.header.member.as_deref() {
684                                            if needs_request_rewrite(member) {
685                                                // Rewrite unique name in body (GetConnectionCredentials etc.)
686                                                match rewrite_unique_name_request(&msg_for_upstream) {
687                                                    Ok((rewritten, _bus)) => {
688                                                        tracing::trace!(
689                                                            member = member,
690                                                            "Rewrote request body to remove fake prefix"
691                                                        );
692                                                        rewritten
693                                                    }
694                                                    Err(e) => {
695                                                        tracing::warn!(
696                                                            member = member,
697                                                            error = %e,
698                                                            "Failed to rewrite request body"
699                                                        );
700                                                        msg_for_upstream.raw.clone()
701                                                    }
702                                                }
703                                            } else if member == "AddMatch" || member == "RemoveMatch" {
704                                                // Rewrite sender in match rule
705                                                match rewrite_match_rule_body(&msg_for_upstream) {
706                                                    Ok(Some(rewritten)) => {
707                                                        tracing::trace!(
708                                                            member = member,
709                                                            "Rewrote match rule sender"
710                                                        );
711                                                        rewritten
712                                                    }
713                                                    Ok(None) => msg_for_upstream.raw.clone(),
714                                                    Err(e) => {
715                                                        tracing::warn!(
716                                                            member = member,
717                                                            error = %e,
718                                                            "Failed to rewrite match rule"
719                                                        );
720                                                        msg_for_upstream.raw.clone()
721                                                    }
722                                                }
723                                            } else {
724                                                msg_for_upstream.raw.clone()
725                                            }
726                                        } else {
727                                            msg_for_upstream.raw.clone()
728                                        }
729                                    } else {
730                                        msg_for_upstream.raw.clone()
731                                    };
732
733                                    self.pending_calls.insert(msg.header.serial, PendingCallInfo {
734                                        bus: target,
735                                        member: msg.header.member.clone(),
736                                    });
737                                    log_message(&msg, "Client", Some(target));
738
739                                    match target {
740                                        Bus::Host => host_write.write_all(&msg_to_send).await?,
741                                        Bus::Sandbox => sandbox_write.write_all(&msg_to_send).await?,
742                                    }
743                                }
744                                RouteDecision::Both => {
745                                    // Send to both buses (e.g., AddMatch without sender)
746                                    // Rewrite match rule body if needed
747                                    let msg_to_send = if msg.header.member.as_deref() == Some("AddMatch")
748                                        || msg.header.member.as_deref() == Some("RemoveMatch")
749                                    {
750                                        match rewrite_match_rule_body(&msg) {
751                                            Ok(Some(rewritten)) => rewritten,
752                                            Ok(None) => msg.raw.clone(),
753                                            Err(_) => msg.raw.clone(),
754                                        }
755                                    } else {
756                                        msg.raw.clone()
757                                    };
758
759                                    self.pending_calls.insert(msg.header.serial, PendingCallInfo {
760                                        bus: Bus::Sandbox,
761                                        member: msg.header.member.clone(),
762                                    });
763                                    log_message(&msg, "Client->Both", None);
764
765                                    host_write.write_all(&msg_to_send).await?;
766                                    sandbox_write.write_all(&msg_to_send).await?;
767                                }
768                                RouteDecision::Merge => {
769                                    // ListNames/ListActivatableNames: send to both and merge results
770                                    log_message(&msg, "Client->Merge", None);
771
772                                    self.pending_merges.insert(msg.header.serial, PendingMerge {
773                                        original_request: msg.clone(),
774                                        first_response: None,
775                                    });
776
777                                    host_write.write_all(&msg.raw).await?;
778                                    sandbox_write.write_all(&msg.raw).await?;
779                                }
780                            }
781                        }
782                        Ok(None) => {
783                            tracing::debug!("Client disconnected");
784                            return Ok(());
785                        }
786                        Err(e) => {
787                            tracing::debug!(error = %e, "Error reading from client");
788                            return Ok(());
789                        }
790                    }
791                }
792
793                // Read from host bus and forward to client
794                result = read_message(&mut host_read) => {
795                    match result {
796                        Ok(Some(msg)) => {
797                            let mut ctx = UpstreamContext {
798                                pending_merges: &mut self.pending_merges,
799                                pending_calls: &self.pending_calls,
800                                incoming_calls: &mut self.incoming_calls,
801                                exported_services: &self.exported_services,
802                                sandbox_services: &self.sandbox_services,
803                            };
804                            handle_upstream_message(
805                                msg,
806                                Bus::Host,
807                                &mut client_write,
808                                &mut ctx,
809                            )
810                            .await?;
811                        }
812                        Ok(None) => {
813                            tracing::debug!("Host bus disconnected");
814                            return Ok(());
815                        }
816                        Err(e) => {
817                            tracing::debug!(error = %e, "Error reading from host bus");
818                            return Ok(());
819                        }
820                    }
821                }
822
823                // Read from sandbox bus and forward to client
824                result = read_message(&mut sandbox_read), if sandbox_active => {
825                    match result {
826                        Ok(Some(msg)) => {
827                            let mut ctx = UpstreamContext {
828                                pending_merges: &mut self.pending_merges,
829                                pending_calls: &self.pending_calls,
830                                incoming_calls: &mut self.incoming_calls,
831                                exported_services: &self.exported_services,
832                                sandbox_services: &self.sandbox_services,
833                            };
834                            handle_upstream_message(
835                                msg,
836                                Bus::Sandbox,
837                                &mut client_write,
838                                &mut ctx,
839                            )
840                            .await?;
841                        }
842                        Ok(None) => {
843                            tracing::debug!("Sandbox bus disconnected");
844                            if is_hostpass {
845                                // Hostpass clients only use host bus, so they can continue
846                                tracing::info!("Hostpass client continues after sandbox disconnect");
847                                sandbox_active = false;
848                                continue;
849                            }
850                            return Ok(());
851                        }
852                        Err(e) => {
853                            tracing::debug!(error = %e, "Error reading from sandbox bus");
854                            if is_hostpass {
855                                tracing::info!("Hostpass client continues after sandbox read error");
856                                sandbox_active = false;
857                                continue;
858                            }
859                            return Ok(());
860                        }
861                    }
862                }
863            }
864        }
865    }
866}