job_security_protocol/
lib.rs

1use std::{ffi::OsString, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use serde::{Deserialize, Serialize};
5
6#[derive(Serialize, Deserialize, Debug, Clone)]
7pub enum Request {
8    /// Allocate a new terminal and start `command`.
9    Start {
10        command: Vec<OsString>,
11        env:     Vec<(OsString, OsString)>,
12        pwd:     PathBuf,
13        rows:    u16,
14        cols:    u16,
15    },
16
17    ListProcesses,
18    Quit,
19
20    /// Restart the stopped foreground process in the given terminal.
21    /// Has no effect if the process is not stopped. If `id` is None, resumes
22    /// the last stopped terminal.
23    Resume {
24        id:          Option<u32>,
25        /// Whether the server should sent over the outputs it has accumulated
26        with_output: bool,
27    },
28
29    /// Window size has changed. Should be sent by the client in response to
30    /// the SIGWINCH signal.
31    WindowSize {
32        id:   u32,
33        rows: u16,
34        cols: u16,
35    },
36}
37
38#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
39pub enum ExitStatus {
40    Exited(i32),
41    Signaled(i32),
42}
43
44#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
45pub enum ProcessState {
46    Running,
47    Stopped,
48    Terminated(ExitStatus),
49}
50
51impl ProcessState {
52    pub fn is_terminated(&self) -> bool {
53        matches!(self, ProcessState::Terminated(_))
54    }
55}
56
57#[derive(Serialize, Deserialize, Debug, Clone)]
58pub struct Process {
59    pub id:        u32,
60    pub pid:       u32,
61    pub state:     ProcessState,
62    pub connected: bool,
63    pub command:   OsString,
64}
65
66#[derive(Serialize, Deserialize, Debug, Clone, thiserror::Error)]
67pub enum Error {
68    #[error("Job {id:?} not found")]
69    NotFound { id: Option<u32> },
70
71    #[error("Job {id} is already foreground somewhere else")]
72    AlreadyConnected { id: u32 },
73
74    #[error("Client sent an invalid request")]
75    InvalidRequest,
76}
77
78#[derive(Serialize, Deserialize, Debug, Clone)]
79pub enum Event {
80    /// Process in the given terminal changed state, i.e. exist, stopped,
81    /// started, or killed. When this is sent in response to a Start
82    /// request, `id` is a newly allocated identifier.
83    StateChanged {
84        /// The unique identifier for the terminal.
85        id:    u32,
86        state: ProcessState,
87    },
88
89    /// Sent on `Resume`, report the current terminal size
90    WindowSize {
91        cols: u16,
92        rows: u16,
93    },
94
95    Error(Error),
96
97    /// Responses for the ListProcesses request
98    Process(Process),
99}
100
101pub use bytes;
102
103pub struct MapCodec<T, EI, DO, R, W, FD, FE> {
104    inner:      T,
105    map_decode: FD,
106    map_encode: FE,
107    #[allow(clippy::type_complexity)]
108    _marker:    std::marker::PhantomData<(fn(DO) -> R, fn(W) -> EI)>,
109}
110
111impl<T, EI, DO, R, W, FD, FE> MapCodec<T, EI, DO, R, W, FD, FE> {
112    pub fn new(inner: T, map_decode: FD, map_encode: FE) -> Self {
113        Self {
114            inner,
115            map_decode,
116            map_encode,
117            _marker: std::marker::PhantomData,
118        }
119    }
120}
121
122impl<T, EI, DO, R, W, FD: FnMut(DO) -> R, FE> tokio_util::codec::Decoder
123    for MapCodec<T, EI, DO, R, W, FD, FE>
124where
125    T: tokio_util::codec::Decoder<Item = DO>,
126{
127    type Error = T::Error;
128    type Item = R;
129
130    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
131        Ok(self.inner.decode(src)?.map(|item| (self.map_decode)(item)))
132    }
133}
134
135impl<T, EI, DO, R, W, FD, FE: FnMut(W) -> EI> tokio_util::codec::Encoder<W>
136    for MapCodec<T, EI, DO, R, W, FD, FE>
137where
138    T: tokio_util::codec::Encoder<EI>,
139{
140    type Error = T::Error;
141
142    fn encode(&mut self, item: W, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
143        self.inner.encode((self.map_encode)(item), dst)
144    }
145}
146pub fn client_codec() -> impl tokio_util::codec::Encoder<Request, Error = std::io::Error>
147       + tokio_util::codec::Decoder<Item = Event, Error = std::io::Error> {
148    let codec = tokio_util::codec::length_delimited::LengthDelimitedCodec::new();
149    MapCodec::new(
150        codec,
151        |bytes: BytesMut| bincode::deserialize(&bytes).unwrap(),
152        |event| -> Bytes { bincode::serialize(&event).unwrap().into() },
153    )
154}
155
156pub fn server_codec() -> impl tokio_util::codec::Encoder<Event, Error = std::io::Error>
157       + tokio_util::codec::Decoder<Item = Request, Error = std::io::Error> {
158    let codec = tokio_util::codec::length_delimited::LengthDelimitedCodec::new();
159    MapCodec::new(
160        codec,
161        |bytes: BytesMut| bincode::deserialize(&bytes).unwrap(),
162        |request| -> Bytes { bincode::serialize(&request).unwrap().into() },
163    )
164}
165
166pub trait Codec<Input, Output, Error> {
167    fn as_encoder(&mut self) -> &mut dyn tokio_util::codec::Encoder<Input, Error = Error>;
168    fn as_decoder(&mut self) -> &mut dyn tokio_util::codec::Decoder<Item = Output, Error = Error>;
169}
170
171impl<T, I, O, E> Codec<I, O, E> for T
172where
173    T: tokio_util::codec::Encoder<I, Error = E> + tokio_util::codec::Decoder<Item = O, Error = E>,
174{
175    fn as_encoder(&mut self) -> &mut dyn tokio_util::codec::Encoder<I, Error = E> {
176        self as _
177    }
178
179    fn as_decoder(&mut self) -> &mut dyn tokio_util::codec::Decoder<Item = O, Error = E> {
180        self as _
181    }
182}
183
184impl<I, O, E: From<std::io::Error>> tokio_util::codec::Encoder<I>
185    for Box<dyn Codec<I, O, E> + Send + Sync + Unpin>
186{
187    type Error = E;
188
189    fn encode(&mut self, item: I, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
190        self.as_mut().as_encoder().encode(item, dst)
191    }
192}
193
194impl<I, O, E: From<std::io::Error>> tokio_util::codec::Decoder
195    for Box<dyn Codec<I, O, E> + Send + Sync + Unpin>
196{
197    type Error = E;
198    type Item = O;
199
200    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
201        self.as_mut().as_decoder().decode(src)
202    }
203
204    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
205        self.as_mut().as_decoder().decode_eof(buf)
206    }
207}
208
209pub type DynServerCodec = Box<dyn Codec<Event, Request, std::io::Error> + Send + Sync + Unpin>;
210pub type DynClientCodec = Box<dyn Codec<Request, Event, std::io::Error> + Send + Sync + Unpin>;