Skip to main content

gritty/
protocol.rs

1use bytes::{Buf, BufMut, Bytes, BytesMut};
2use std::io;
3use tokio_util::codec::{Decoder, Encoder};
4
5const TYPE_DATA: u8 = 0x01;
6const TYPE_RESIZE: u8 = 0x02;
7const TYPE_EXIT: u8 = 0x03;
8const TYPE_DETACHED: u8 = 0x04;
9const TYPE_PING: u8 = 0x05;
10const TYPE_PONG: u8 = 0x06;
11const TYPE_ENV: u8 = 0x07;
12const TYPE_AGENT_FORWARD: u8 = 0x08;
13const TYPE_AGENT_OPEN: u8 = 0x09;
14const TYPE_AGENT_DATA: u8 = 0x0A;
15const TYPE_AGENT_CLOSE: u8 = 0x0B;
16const TYPE_OPEN_FORWARD: u8 = 0x0C;
17const TYPE_OPEN_URL: u8 = 0x0D;
18const TYPE_TUNNEL_LISTEN: u8 = 0x0E;
19const TYPE_TUNNEL_OPEN: u8 = 0x0F;
20const TYPE_NEW_SESSION: u8 = 0x10;
21const TYPE_ATTACH: u8 = 0x11;
22const TYPE_LIST_SESSIONS: u8 = 0x12;
23const TYPE_KILL_SESSION: u8 = 0x13;
24const TYPE_KILL_SERVER: u8 = 0x14;
25const TYPE_TAIL: u8 = 0x15;
26const TYPE_HELLO: u8 = 0x16;
27const TYPE_TUNNEL_DATA: u8 = 0x17;
28const TYPE_TUNNEL_CLOSE: u8 = 0x18;
29const TYPE_SEND_OFFER: u8 = 0x19;
30const TYPE_SEND_DONE: u8 = 0x1A;
31const TYPE_SEND_CANCEL: u8 = 0x1B;
32const TYPE_SESSION_CREATED: u8 = 0x20;
33const TYPE_SESSION_INFO: u8 = 0x21;
34const TYPE_OK: u8 = 0x22;
35const TYPE_ERROR: u8 = 0x23;
36const TYPE_HELLO_ACK: u8 = 0x24;
37const TYPE_SEND_FILE: u8 = 0x25;
38const TYPE_PORT_FORWARD_LISTEN: u8 = 0x1C;
39const TYPE_PORT_FORWARD_READY: u8 = 0x1D;
40const TYPE_PORT_FORWARD_OPEN: u8 = 0x1E;
41const TYPE_PORT_FORWARD_DATA: u8 = 0x1F;
42const TYPE_PORT_FORWARD_CLOSE: u8 = 0x26;
43const TYPE_PORT_FORWARD_STOP: u8 = 0x27;
44const TYPE_RENAME_SESSION: u8 = 0x28;
45
46const HEADER_LEN: usize = 5; // type(1) + length(4)
47const MAX_FRAME_SIZE: usize = 1 << 20; // 1 MB
48
49/// Protocol version for handshake negotiation.
50pub const PROTOCOL_VERSION: u16 = 3;
51
52/// Discriminator byte for the unified per-session service socket (`svc-{id}.sock`).
53/// Sent as the first byte on every connection to route to the correct handler.
54#[repr(u8)]
55pub enum SvcRequest {
56    OpenUrl = 1,
57    Send = 2,
58    Receive = 3,
59    PortForward = 4,
60}
61
62impl SvcRequest {
63    pub fn from_byte(b: u8) -> Option<Self> {
64        match b {
65            1 => Some(Self::OpenUrl),
66            2 => Some(Self::Send),
67            3 => Some(Self::Receive),
68            4 => Some(Self::PortForward),
69            _ => None,
70        }
71    }
72
73    pub fn to_byte(self) -> u8 {
74        self as u8
75    }
76}
77
78/// Metadata for one session, returned in SessionInfo.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct SessionEntry {
81    pub id: String,
82    pub name: String,
83    pub pty_path: String,
84    pub shell_pid: u32,
85    pub created_at: u64,
86    pub attached: bool,
87    pub last_heartbeat: u64,
88    pub foreground_cmd: String,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub enum Frame {
93    Data(Bytes),
94    Resize {
95        cols: u16,
96        rows: u16,
97    },
98    Exit {
99        code: i32,
100    },
101    /// Sent to a client when another client takes over the session.
102    Detached,
103    /// Heartbeat request (client → server).
104    Ping,
105    /// Heartbeat reply (server → client).
106    Pong,
107    /// Environment variables (client → server, sent before first Resize on new session).
108    Env {
109        vars: Vec<(String, String)>,
110    },
111    /// Client signals it can handle agent forwarding (client → server).
112    AgentForward,
113    /// New agent connection on the remote side (server → client).
114    AgentOpen {
115        channel_id: u32,
116    },
117    /// Agent protocol data (bidirectional).
118    AgentData {
119        channel_id: u32,
120        data: Bytes,
121    },
122    /// Close an agent channel (bidirectional).
123    AgentClose {
124        channel_id: u32,
125    },
126    /// Client signals it can handle URL open forwarding (client → server).
127    OpenForward,
128    /// URL to open on the client machine (server → client).
129    OpenUrl {
130        url: String,
131    },
132    /// Server asks client to bind a local TCP port for reverse tunneling (server → client).
133    TunnelListen {
134        port: u16,
135    },
136    /// Client signals a tunnel connection has been accepted (client → server).
137    TunnelOpen {
138        channel_id: u32,
139    },
140    /// Tunnel data relay (bidirectional).
141    TunnelData {
142        channel_id: u32,
143        data: Bytes,
144    },
145    /// Tunnel connection closed (bidirectional).
146    TunnelClose {
147        channel_id: u32,
148    },
149    /// Server notifies attached client that a file transfer started (server → client).
150    SendOffer {
151        file_count: u32,
152        total_bytes: u64,
153    },
154    /// Server notifies attached client that a file transfer completed (server → client).
155    SendDone,
156    /// File transfer cancelled (server → client).
157    SendCancel {
158        reason: String,
159    },
160    /// Server asks client to set up a port forward listener (server → client for remote-fwd).
161    PortForwardListen {
162        forward_id: u32,
163        listen_port: u16,
164        target_port: u16,
165    },
166    /// Client confirms port forward listener is ready (client → server).
167    PortForwardReady {
168        forward_id: u32,
169    },
170    /// New TCP connection on a port forward (bidirectional).
171    PortForwardOpen {
172        forward_id: u32,
173        channel_id: u32,
174        target_port: u16,
175    },
176    /// Port forward channel data (bidirectional).
177    PortForwardData {
178        channel_id: u32,
179        data: Bytes,
180    },
181    /// Close a port forward channel (bidirectional).
182    PortForwardClose {
183        channel_id: u32,
184    },
185    /// Tear down an entire port forward (server → client).
186    PortForwardStop {
187        forward_id: u32,
188    },
189    /// Protocol version handshake (client → server, first frame on connection).
190    Hello {
191        version: u16,
192    },
193    /// Protocol version acknowledgement (server → client).
194    HelloAck {
195        version: u16,
196    },
197    // Control requests
198    /// Local-side file transfer routing (client → daemon).
199    SendFile {
200        session: String,
201        role: u8,
202    },
203    NewSession {
204        name: String,
205        command: String,
206    },
207    Attach {
208        session: String,
209    },
210    /// Read-only tail of a session's PTY output (client → server).
211    Tail {
212        session: String,
213    },
214    ListSessions,
215    KillSession {
216        session: String,
217    },
218    KillServer,
219    RenameSession {
220        session: String,
221        new_name: String,
222    },
223    // Control responses
224    SessionCreated {
225        id: String,
226    },
227    SessionInfo {
228        sessions: Vec<SessionEntry>,
229    },
230    Ok,
231    Error {
232        message: String,
233    },
234}
235
236impl Frame {
237    /// Extract a Frame from a `framed.next().await` result, converting
238    /// the common None / Some(Err) cases into descriptive errors.
239    pub fn expect_from(result: Option<Result<Frame, io::Error>>) -> anyhow::Result<Frame> {
240        match result {
241            Some(Ok(frame)) => Ok(frame),
242            Some(Err(e)) => Err(anyhow::anyhow!("daemon protocol error: {e}")),
243            None => Err(anyhow::anyhow!("daemon closed connection")),
244        }
245    }
246}
247
248pub struct FrameCodec;
249
250fn encode_empty(dst: &mut BytesMut, ty: u8) {
251    dst.put_u8(ty);
252    dst.put_u32(0);
253}
254
255fn encode_str(dst: &mut BytesMut, ty: u8, s: &str) {
256    dst.put_u8(ty);
257    dst.put_u32(s.len() as u32);
258    dst.extend_from_slice(s.as_bytes());
259}
260
261fn encode_blob(dst: &mut BytesMut, ty: u8, data: &[u8]) {
262    dst.put_u8(ty);
263    dst.put_u32(data.len() as u32);
264    dst.extend_from_slice(data);
265}
266
267fn encode_prefix_blob(dst: &mut BytesMut, ty: u8, prefix: u32, data: &[u8]) {
268    dst.put_u8(ty);
269    dst.put_u32(4 + data.len() as u32);
270    dst.put_u32(prefix);
271    dst.extend_from_slice(data);
272}
273
274fn encode_env(dst: &mut BytesMut, vars: &[(String, String)]) {
275    let body_len: usize = 4 + vars.iter().map(|(k, v)| 2 + k.len() + 2 + v.len()).sum::<usize>();
276    dst.put_u8(TYPE_ENV);
277    dst.put_u32(body_len as u32);
278    dst.put_u32(vars.len() as u32);
279    for (k, v) in vars {
280        dst.put_u16(k.len() as u16);
281        dst.extend_from_slice(k.as_bytes());
282        dst.put_u16(v.len() as u16);
283        dst.extend_from_slice(v.as_bytes());
284    }
285}
286
287fn encode_session_info(dst: &mut BytesMut, sessions: &[SessionEntry]) {
288    let body_len: usize = 4 + sessions
289        .iter()
290        .map(|e| {
291            2 + e.id.len()
292                + 2
293                + e.name.len()
294                + 2
295                + e.pty_path.len()
296                + 21
297                + 2
298                + e.foreground_cmd.len()
299        })
300        .sum::<usize>();
301    dst.put_u8(TYPE_SESSION_INFO);
302    dst.put_u32(body_len as u32);
303    dst.put_u32(sessions.len() as u32);
304    for e in sessions {
305        dst.put_u16(e.id.len() as u16);
306        dst.extend_from_slice(e.id.as_bytes());
307        dst.put_u16(e.name.len() as u16);
308        dst.extend_from_slice(e.name.as_bytes());
309        dst.put_u16(e.pty_path.len() as u16);
310        dst.extend_from_slice(e.pty_path.as_bytes());
311        dst.put_u32(e.shell_pid);
312        dst.put_u64(e.created_at);
313        dst.put_u8(if e.attached { 1 } else { 0 });
314        dst.put_u64(e.last_heartbeat);
315        dst.put_u16(e.foreground_cmd.len() as u16);
316        dst.extend_from_slice(e.foreground_cmd.as_bytes());
317    }
318}
319
320fn decode_string(payload: BytesMut) -> Result<String, io::Error> {
321    String::from_utf8(payload.to_vec()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
322}
323
324fn expect_len(payload: &BytesMut, expected: usize, name: &str) -> Result<(), io::Error> {
325    if payload.len() != expected {
326        return Err(io::Error::new(
327            io::ErrorKind::InvalidData,
328            format!("{name} frame must be {expected} bytes"),
329        ));
330    }
331    Ok(())
332}
333
334fn expect_min_len(payload: &BytesMut, min: usize, name: &str) -> Result<(), io::Error> {
335    if payload.len() < min {
336        return Err(io::Error::new(
337            io::ErrorKind::InvalidData,
338            format!("{name} frame must be at least {min} bytes"),
339        ));
340    }
341    Ok(())
342}
343
344fn read_u16(payload: &[u8], offset: usize) -> u16 {
345    u16::from_be_bytes([payload[offset], payload[offset + 1]])
346}
347
348fn read_u32(payload: &[u8], offset: usize) -> u32 {
349    u32::from_be_bytes([
350        payload[offset],
351        payload[offset + 1],
352        payload[offset + 2],
353        payload[offset + 3],
354    ])
355}
356
357fn read_i32(payload: &[u8], offset: usize) -> i32 {
358    i32::from_be_bytes([
359        payload[offset],
360        payload[offset + 1],
361        payload[offset + 2],
362        payload[offset + 3],
363    ])
364}
365
366fn read_u64(payload: &[u8], offset: usize) -> u64 {
367    u64::from_be_bytes([
368        payload[offset],
369        payload[offset + 1],
370        payload[offset + 2],
371        payload[offset + 3],
372        payload[offset + 4],
373        payload[offset + 5],
374        payload[offset + 6],
375        payload[offset + 7],
376    ])
377}
378
379/// Auto-offset-tracking reader for decoding fixed-field payloads.
380struct PayloadReader<'a> {
381    data: &'a [u8],
382    off: usize,
383}
384
385impl<'a> PayloadReader<'a> {
386    fn new(data: &'a [u8]) -> Self {
387        Self { data, off: 0 }
388    }
389
390    fn u16(&mut self) -> u16 {
391        let v = read_u16(self.data, self.off);
392        self.off += 2;
393        v
394    }
395
396    fn u32(&mut self) -> u32 {
397        let v = read_u32(self.data, self.off);
398        self.off += 4;
399        v
400    }
401
402    fn i32(&mut self) -> i32 {
403        let v = read_i32(self.data, self.off);
404        self.off += 4;
405        v
406    }
407
408    fn u64(&mut self) -> u64 {
409        let v = read_u64(self.data, self.off);
410        self.off += 8;
411        v
412    }
413
414    fn offset(&self) -> usize {
415        self.off
416    }
417}
418
419/// Encode a fixed-field frame: writes type byte, auto-computes payload length, writes fields.
420macro_rules! encode_fields {
421    ($dst:expr, $ty:expr $(, $val:expr => $method:ident)*) => {{
422        let payload_len: u32 = 0 $(+ encode_fields!(@size $method))*;
423        $dst.put_u8($ty);
424        $dst.put_u32(payload_len);
425        $($dst.$method($val);)*
426    }};
427    (@size put_u8) => { 1 };
428    (@size put_u16) => { 2 };
429    (@size put_u32) => { 4 };
430    (@size put_i32) => { 4 };
431    (@size put_u64) => { 8 };
432}
433
434fn decode_env(payload: BytesMut) -> Result<Option<Frame>, io::Error> {
435    let p = &payload[..];
436    if p.len() < 4 {
437        return Err(io::Error::new(io::ErrorKind::InvalidData, "env frame too short"));
438    }
439    let count = read_u32(p, 0) as usize;
440    let mut off = 4;
441    let mut vars = Vec::with_capacity(count.min(1024));
442    for _ in 0..count {
443        if off + 2 > p.len() {
444            return Err(io::Error::new(io::ErrorKind::InvalidData, "env frame truncated"));
445        }
446        let klen = read_u16(p, off) as usize;
447        off += 2;
448        if off + klen + 2 > p.len() {
449            return Err(io::Error::new(io::ErrorKind::InvalidData, "env frame truncated"));
450        }
451        let key = String::from_utf8(p[off..off + klen].to_vec())
452            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
453        off += klen;
454        let vlen = read_u16(p, off) as usize;
455        off += 2;
456        if off + vlen > p.len() {
457            return Err(io::Error::new(io::ErrorKind::InvalidData, "env frame truncated"));
458        }
459        let val = String::from_utf8(p[off..off + vlen].to_vec())
460            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
461        off += vlen;
462        vars.push((key, val));
463    }
464    Ok(Some(Frame::Env { vars }))
465}
466
467fn decode_session_info(payload: BytesMut) -> Result<Option<Frame>, io::Error> {
468    let p = &payload[..];
469    if p.len() < 4 {
470        return Err(io::Error::new(io::ErrorKind::InvalidData, "session info frame too short"));
471    }
472    let count = read_u32(p, 0) as usize;
473    let mut off = 4;
474    let mut sessions = Vec::with_capacity(count.min(1024));
475    let read_str = |p: &[u8], off: &mut usize| -> Result<String, io::Error> {
476        if *off + 2 > p.len() {
477            return Err(io::Error::new(io::ErrorKind::InvalidData, "session info truncated"));
478        }
479        let len = read_u16(p, *off) as usize;
480        *off += 2;
481        if *off + len > p.len() {
482            return Err(io::Error::new(io::ErrorKind::InvalidData, "session info truncated"));
483        }
484        let s = String::from_utf8(p[*off..*off + len].to_vec())
485            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
486        *off += len;
487        Ok(s)
488    };
489    for _ in 0..count {
490        let id = read_str(p, &mut off)?;
491        let name = read_str(p, &mut off)?;
492        let pty_path = read_str(p, &mut off)?;
493        // Fixed fields: shell_pid(4) + created_at(8) + attached(1) + last_heartbeat(8) = 21
494        if off + 21 > p.len() {
495            return Err(io::Error::new(io::ErrorKind::InvalidData, "session info truncated"));
496        }
497        let shell_pid = read_u32(p, off);
498        off += 4;
499        let created_at = read_u64(p, off);
500        off += 8;
501        let attached = p[off] != 0;
502        off += 1;
503        let last_heartbeat = read_u64(p, off);
504        off += 8;
505        // Optional field: foreground_cmd (backwards compat -- empty if absent)
506        let foreground_cmd = if off + 2 <= p.len() {
507            read_str(p, &mut off).unwrap_or_default()
508        } else {
509            String::new()
510        };
511        sessions.push(SessionEntry {
512            id,
513            name,
514            pty_path,
515            shell_pid,
516            created_at,
517            attached,
518            last_heartbeat,
519            foreground_cmd,
520        });
521    }
522    Ok(Some(Frame::SessionInfo { sessions }))
523}
524
525impl Decoder for FrameCodec {
526    type Item = Frame;
527    type Error = io::Error;
528
529    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Frame>, io::Error> {
530        if src.len() < HEADER_LEN {
531            return Ok(None);
532        }
533
534        let frame_type = src[0];
535        let payload_len = u32::from_be_bytes([src[1], src[2], src[3], src[4]]) as usize;
536
537        if payload_len > MAX_FRAME_SIZE {
538            return Err(io::Error::new(
539                io::ErrorKind::InvalidData,
540                format!("frame payload too large: {payload_len} bytes (max {MAX_FRAME_SIZE})"),
541            ));
542        }
543
544        if src.len() < HEADER_LEN + payload_len {
545            src.reserve(HEADER_LEN + payload_len - src.len());
546            return Ok(None);
547        }
548
549        src.advance(HEADER_LEN);
550        let payload = src.split_to(payload_len);
551
552        match frame_type {
553            // Blob frames
554            TYPE_DATA => Ok(Some(Frame::Data(payload.freeze()))),
555
556            // Fixed-field frames (PayloadReader auto-tracks offsets)
557            TYPE_RESIZE => {
558                expect_len(&payload, 4, "resize")?;
559                let mut r = PayloadReader::new(&payload);
560                Ok(Some(Frame::Resize { cols: r.u16(), rows: r.u16() }))
561            }
562            TYPE_EXIT => {
563                expect_len(&payload, 4, "exit")?;
564                Ok(Some(Frame::Exit { code: PayloadReader::new(&payload).i32() }))
565            }
566            TYPE_HELLO => {
567                expect_len(&payload, 2, "hello")?;
568                Ok(Some(Frame::Hello { version: PayloadReader::new(&payload).u16() }))
569            }
570            TYPE_HELLO_ACK => {
571                expect_len(&payload, 2, "hello ack")?;
572                Ok(Some(Frame::HelloAck { version: PayloadReader::new(&payload).u16() }))
573            }
574            TYPE_AGENT_OPEN => {
575                expect_len(&payload, 4, "agent open")?;
576                Ok(Some(Frame::AgentOpen { channel_id: PayloadReader::new(&payload).u32() }))
577            }
578            TYPE_AGENT_CLOSE => {
579                expect_len(&payload, 4, "agent close")?;
580                Ok(Some(Frame::AgentClose { channel_id: PayloadReader::new(&payload).u32() }))
581            }
582            TYPE_TUNNEL_LISTEN => {
583                expect_len(&payload, 2, "tunnel listen")?;
584                Ok(Some(Frame::TunnelListen { port: PayloadReader::new(&payload).u16() }))
585            }
586            TYPE_TUNNEL_OPEN => {
587                expect_len(&payload, 4, "tunnel open")?;
588                Ok(Some(Frame::TunnelOpen { channel_id: PayloadReader::new(&payload).u32() }))
589            }
590            TYPE_TUNNEL_CLOSE => {
591                expect_len(&payload, 4, "tunnel close")?;
592                Ok(Some(Frame::TunnelClose { channel_id: PayloadReader::new(&payload).u32() }))
593            }
594            TYPE_SEND_OFFER => {
595                expect_len(&payload, 12, "send offer")?;
596                let mut r = PayloadReader::new(&payload);
597                Ok(Some(Frame::SendOffer { file_count: r.u32(), total_bytes: r.u64() }))
598            }
599            TYPE_PORT_FORWARD_LISTEN => {
600                expect_len(&payload, 8, "port forward listen")?;
601                let mut r = PayloadReader::new(&payload);
602                Ok(Some(Frame::PortForwardListen {
603                    forward_id: r.u32(),
604                    listen_port: r.u16(),
605                    target_port: r.u16(),
606                }))
607            }
608            TYPE_PORT_FORWARD_READY => {
609                expect_len(&payload, 4, "port forward ready")?;
610                Ok(Some(Frame::PortForwardReady { forward_id: PayloadReader::new(&payload).u32() }))
611            }
612            TYPE_PORT_FORWARD_OPEN => {
613                expect_len(&payload, 10, "port forward open")?;
614                let mut r = PayloadReader::new(&payload);
615                Ok(Some(Frame::PortForwardOpen {
616                    forward_id: r.u32(),
617                    channel_id: r.u32(),
618                    target_port: r.u16(),
619                }))
620            }
621            TYPE_PORT_FORWARD_CLOSE => {
622                expect_len(&payload, 4, "port forward close")?;
623                Ok(Some(Frame::PortForwardClose { channel_id: PayloadReader::new(&payload).u32() }))
624            }
625            TYPE_PORT_FORWARD_STOP => {
626                expect_len(&payload, 4, "port forward stop")?;
627                Ok(Some(Frame::PortForwardStop { forward_id: PayloadReader::new(&payload).u32() }))
628            }
629
630            // Prefix + blob frames (fixed header, trailing bytes)
631            TYPE_AGENT_DATA => {
632                expect_min_len(&payload, 4, "agent data")?;
633                let mut r = PayloadReader::new(&payload);
634                let channel_id = r.u32();
635                let off = r.offset();
636                Ok(Some(Frame::AgentData { channel_id, data: payload.freeze().slice(off..) }))
637            }
638            TYPE_TUNNEL_DATA => {
639                expect_min_len(&payload, 4, "tunnel data")?;
640                let mut r = PayloadReader::new(&payload);
641                let channel_id = r.u32();
642                let off = r.offset();
643                Ok(Some(Frame::TunnelData { channel_id, data: payload.freeze().slice(off..) }))
644            }
645            TYPE_PORT_FORWARD_DATA => {
646                expect_min_len(&payload, 4, "port forward data")?;
647                let mut r = PayloadReader::new(&payload);
648                let channel_id = r.u32();
649                let off = r.offset();
650                Ok(Some(Frame::PortForwardData { channel_id, data: payload.freeze().slice(off..) }))
651            }
652
653            // Empty frames
654            TYPE_DETACHED => Ok(Some(Frame::Detached)),
655            TYPE_PING => Ok(Some(Frame::Ping)),
656            TYPE_PONG => Ok(Some(Frame::Pong)),
657            TYPE_AGENT_FORWARD => Ok(Some(Frame::AgentForward)),
658            TYPE_OPEN_FORWARD => Ok(Some(Frame::OpenForward)),
659            TYPE_SEND_DONE => Ok(Some(Frame::SendDone)),
660            TYPE_LIST_SESSIONS => Ok(Some(Frame::ListSessions)),
661            TYPE_KILL_SERVER => Ok(Some(Frame::KillServer)),
662            TYPE_OK => Ok(Some(Frame::Ok)),
663
664            // String frames
665            TYPE_OPEN_URL => Ok(Some(Frame::OpenUrl { url: decode_string(payload)? })),
666            TYPE_SEND_CANCEL => Ok(Some(Frame::SendCancel { reason: decode_string(payload)? })),
667            TYPE_NEW_SESSION => {
668                if payload.len() < 2 {
669                    return Ok(Some(Frame::NewSession {
670                        name: String::new(),
671                        command: String::new(),
672                    }));
673                }
674                let name_len = read_u16(&payload, 0) as usize;
675                if 2 + name_len > payload.len() {
676                    return Err(io::Error::new(
677                        io::ErrorKind::InvalidData,
678                        "new session frame truncated",
679                    ));
680                }
681                let name = String::from_utf8(payload[2..2 + name_len].to_vec())
682                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
683                let command = String::from_utf8(payload[2 + name_len..].to_vec())
684                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
685                Ok(Some(Frame::NewSession { name, command }))
686            }
687            TYPE_ATTACH => Ok(Some(Frame::Attach { session: decode_string(payload)? })),
688            TYPE_TAIL => Ok(Some(Frame::Tail { session: decode_string(payload)? })),
689            TYPE_KILL_SESSION => Ok(Some(Frame::KillSession { session: decode_string(payload)? })),
690            TYPE_RENAME_SESSION => {
691                if payload.len() < 2 {
692                    return Err(io::Error::new(
693                        io::ErrorKind::InvalidData,
694                        "rename session frame too short",
695                    ));
696                }
697                let session_len = read_u16(&payload, 0) as usize;
698                if 2 + session_len > payload.len() {
699                    return Err(io::Error::new(
700                        io::ErrorKind::InvalidData,
701                        "rename session frame truncated",
702                    ));
703                }
704                let session = String::from_utf8(payload[2..2 + session_len].to_vec())
705                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
706                let new_name = String::from_utf8(payload[2 + session_len..].to_vec())
707                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
708                Ok(Some(Frame::RenameSession { session, new_name }))
709            }
710            TYPE_SESSION_CREATED => Ok(Some(Frame::SessionCreated { id: decode_string(payload)? })),
711            TYPE_ERROR => Ok(Some(Frame::Error { message: decode_string(payload)? })),
712
713            // Custom frames
714            TYPE_ENV => decode_env(payload),
715            TYPE_SESSION_INFO => decode_session_info(payload),
716            TYPE_SEND_FILE => {
717                expect_min_len(&payload, 1, "send file")?;
718                let role = payload[payload.len() - 1];
719                let session = String::from_utf8(payload[..payload.len() - 1].to_vec())
720                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
721                Ok(Some(Frame::SendFile { session, role }))
722            }
723
724            _ => Err(io::Error::new(
725                io::ErrorKind::InvalidData,
726                format!("unknown frame type: 0x{frame_type:02x}"),
727            )),
728        }
729    }
730}
731
732impl Encoder<Frame> for FrameCodec {
733    type Error = io::Error;
734
735    fn encode(&mut self, frame: Frame, dst: &mut BytesMut) -> Result<(), io::Error> {
736        match frame {
737            // Blob frames
738            Frame::Data(data) => encode_blob(dst, TYPE_DATA, &data),
739
740            // Fixed-field frames (encode_fields! auto-computes payload length)
741            Frame::Resize { cols, rows } => {
742                encode_fields!(dst, TYPE_RESIZE, cols => put_u16, rows => put_u16);
743            }
744            Frame::Exit { code } => {
745                encode_fields!(dst, TYPE_EXIT, code => put_i32);
746            }
747            Frame::Hello { version } => {
748                encode_fields!(dst, TYPE_HELLO, version => put_u16);
749            }
750            Frame::HelloAck { version } => {
751                encode_fields!(dst, TYPE_HELLO_ACK, version => put_u16);
752            }
753            Frame::AgentOpen { channel_id } => {
754                encode_fields!(dst, TYPE_AGENT_OPEN, channel_id => put_u32);
755            }
756            Frame::AgentClose { channel_id } => {
757                encode_fields!(dst, TYPE_AGENT_CLOSE, channel_id => put_u32);
758            }
759            Frame::TunnelListen { port } => {
760                encode_fields!(dst, TYPE_TUNNEL_LISTEN, port => put_u16);
761            }
762            Frame::TunnelOpen { channel_id } => {
763                encode_fields!(dst, TYPE_TUNNEL_OPEN, channel_id => put_u32);
764            }
765            Frame::TunnelClose { channel_id } => {
766                encode_fields!(dst, TYPE_TUNNEL_CLOSE, channel_id => put_u32);
767            }
768            Frame::SendOffer { file_count, total_bytes } => {
769                encode_fields!(dst, TYPE_SEND_OFFER, file_count => put_u32, total_bytes => put_u64);
770            }
771            Frame::PortForwardListen { forward_id, listen_port, target_port } => {
772                encode_fields!(dst, TYPE_PORT_FORWARD_LISTEN,
773                    forward_id => put_u32, listen_port => put_u16, target_port => put_u16);
774            }
775            Frame::PortForwardReady { forward_id } => {
776                encode_fields!(dst, TYPE_PORT_FORWARD_READY, forward_id => put_u32);
777            }
778            Frame::PortForwardOpen { forward_id, channel_id, target_port } => {
779                encode_fields!(dst, TYPE_PORT_FORWARD_OPEN,
780                    forward_id => put_u32, channel_id => put_u32, target_port => put_u16);
781            }
782            Frame::PortForwardClose { channel_id } => {
783                encode_fields!(dst, TYPE_PORT_FORWARD_CLOSE, channel_id => put_u32);
784            }
785            Frame::PortForwardStop { forward_id } => {
786                encode_fields!(dst, TYPE_PORT_FORWARD_STOP, forward_id => put_u32);
787            }
788
789            // Prefix + blob frames
790            Frame::AgentData { channel_id, data } => {
791                encode_prefix_blob(dst, TYPE_AGENT_DATA, channel_id, &data);
792            }
793            Frame::TunnelData { channel_id, data } => {
794                encode_prefix_blob(dst, TYPE_TUNNEL_DATA, channel_id, &data);
795            }
796            Frame::PortForwardData { channel_id, data } => {
797                encode_prefix_blob(dst, TYPE_PORT_FORWARD_DATA, channel_id, &data);
798            }
799
800            // Empty frames
801            Frame::Detached => encode_empty(dst, TYPE_DETACHED),
802            Frame::Ping => encode_empty(dst, TYPE_PING),
803            Frame::Pong => encode_empty(dst, TYPE_PONG),
804            Frame::AgentForward => encode_empty(dst, TYPE_AGENT_FORWARD),
805            Frame::OpenForward => encode_empty(dst, TYPE_OPEN_FORWARD),
806            Frame::SendDone => encode_empty(dst, TYPE_SEND_DONE),
807            Frame::ListSessions => encode_empty(dst, TYPE_LIST_SESSIONS),
808            Frame::KillServer => encode_empty(dst, TYPE_KILL_SERVER),
809            Frame::Ok => encode_empty(dst, TYPE_OK),
810
811            // String frames
812            Frame::OpenUrl { url } => encode_str(dst, TYPE_OPEN_URL, &url),
813            Frame::SendCancel { reason } => encode_str(dst, TYPE_SEND_CANCEL, &reason),
814            Frame::NewSession { name, command } => {
815                let name_bytes = name.as_bytes();
816                let cmd_bytes = command.as_bytes();
817                let payload_len = 2 + name_bytes.len() + cmd_bytes.len();
818                dst.put_u8(TYPE_NEW_SESSION);
819                dst.put_u32(payload_len as u32);
820                dst.put_u16(name_bytes.len() as u16);
821                dst.extend_from_slice(name_bytes);
822                dst.extend_from_slice(cmd_bytes);
823            }
824            Frame::Attach { session } => encode_str(dst, TYPE_ATTACH, &session),
825            Frame::Tail { session } => encode_str(dst, TYPE_TAIL, &session),
826            Frame::KillSession { session } => encode_str(dst, TYPE_KILL_SESSION, &session),
827            Frame::RenameSession { session, new_name } => {
828                let session_bytes = session.as_bytes();
829                let name_bytes = new_name.as_bytes();
830                let payload_len = 2 + session_bytes.len() + name_bytes.len();
831                dst.put_u8(TYPE_RENAME_SESSION);
832                dst.put_u32(payload_len as u32);
833                dst.put_u16(session_bytes.len() as u16);
834                dst.extend_from_slice(session_bytes);
835                dst.extend_from_slice(name_bytes);
836            }
837            Frame::SessionCreated { id } => encode_str(dst, TYPE_SESSION_CREATED, &id),
838            Frame::Error { message } => encode_str(dst, TYPE_ERROR, &message),
839
840            // Custom frames
841            Frame::Env { vars } => encode_env(dst, &vars),
842            Frame::SessionInfo { sessions } => encode_session_info(dst, &sessions),
843            Frame::SendFile { session, role } => {
844                dst.put_u8(TYPE_SEND_FILE);
845                dst.put_u32((session.len() + 1) as u32);
846                dst.extend_from_slice(session.as_bytes());
847                dst.put_u8(role);
848            }
849        }
850        Ok(())
851    }
852}