Skip to main content

secure_exec_sidecar/
extension.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use crate::protocol::{
6    CloseStdinRequest, EventFrame, EventPayload, ExecuteRequest, ExtEnvelope,
7    GuestFilesystemCallRequest, GuestFilesystemResultResponse, KillProcessRequest, OwnershipScope,
8    ProcessKilledResponse, ProcessStartedResponse, SidecarRequestPayload, SidecarResponsePayload,
9    StdinClosedResponse, StdinWrittenResponse, WriteStdinRequest,
10};
11use crate::state::{SharedSidecarRequestClient, SidecarError};
12
13pub type ExtensionFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, SidecarError>> + 'a>>;
14
15pub trait ExtensionHost {
16    fn spawn_process<'a>(
17        &'a mut self,
18        ownership: OwnershipScope,
19        request: ExecuteRequest,
20    ) -> ExtensionFuture<'a, ProcessStartedResponse>;
21
22    fn write_stdin<'a>(
23        &'a mut self,
24        ownership: OwnershipScope,
25        request: WriteStdinRequest,
26    ) -> ExtensionFuture<'a, StdinWrittenResponse>;
27
28    fn close_stdin<'a>(
29        &'a mut self,
30        ownership: OwnershipScope,
31        request: CloseStdinRequest,
32    ) -> ExtensionFuture<'a, StdinClosedResponse>;
33
34    fn kill_process<'a>(
35        &'a mut self,
36        ownership: OwnershipScope,
37        request: KillProcessRequest,
38    ) -> ExtensionFuture<'a, ProcessKilledResponse>;
39
40    fn poll_event<'a>(
41        &'a mut self,
42        ownership: OwnershipScope,
43        timeout: Duration,
44    ) -> ExtensionFuture<'a, Option<EventFrame>>;
45
46    fn guest_filesystem_call<'a>(
47        &'a mut self,
48        ownership: OwnershipScope,
49        request: GuestFilesystemCallRequest,
50    ) -> ExtensionFuture<'a, GuestFilesystemResultResponse>;
51
52    fn bind_process_to_session<'a>(
53        &'a mut self,
54        ownership: OwnershipScope,
55        namespace: String,
56        ext_session_id: String,
57        process_id: String,
58    ) -> ExtensionFuture<'a, ()>;
59
60    fn bind_vm_to_session<'a>(
61        &'a mut self,
62        ownership: OwnershipScope,
63        namespace: String,
64        ext_session_id: String,
65    ) -> ExtensionFuture<'a, ()>;
66
67    fn dispose_session_resources<'a>(
68        &'a mut self,
69        ownership: OwnershipScope,
70        namespace: String,
71        ext_session_id: String,
72    ) -> ExtensionFuture<'a, Vec<EventFrame>>;
73
74    fn start_buffering_process_output<'a>(
75        &'a mut self,
76        ownership: OwnershipScope,
77        process_id: String,
78    ) -> ExtensionFuture<'a, ()>;
79
80    fn handoff_buffered_process_output<'a>(
81        &'a mut self,
82        ownership: OwnershipScope,
83        namespace: String,
84        ext_session_id: String,
85        process_id: String,
86        timeout: Duration,
87    ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput>;
88}
89
90#[derive(Debug, Clone, Default, PartialEq, Eq)]
91pub struct ExtensionBufferedProcessOutput {
92    pub stdout: Vec<u8>,
93    pub stderr: Vec<u8>,
94    pub stdout_truncated: bool,
95    pub stderr_truncated: bool,
96}
97
98impl ExtensionBufferedProcessOutput {
99    pub(crate) fn append_stdout(&mut self, chunk: &[u8], cap: usize) {
100        self.stdout_truncated |= append_bounded_bytes(&mut self.stdout, chunk, cap);
101    }
102
103    pub(crate) fn append_stderr(&mut self, chunk: &[u8], cap: usize) {
104        self.stderr_truncated |= append_bounded_bytes(&mut self.stderr, chunk, cap);
105    }
106}
107
108fn append_bounded_bytes(buffer: &mut Vec<u8>, chunk: &[u8], cap: usize) -> bool {
109    buffer.extend_from_slice(chunk);
110    if buffer.len() <= cap {
111        return false;
112    }
113    let remove_len = buffer.len() - cap;
114    buffer.drain(..remove_len);
115    true
116}
117
118#[derive(Debug, Clone)]
119pub struct ExtensionResponse {
120    pub payload: Vec<u8>,
121    pub events: Vec<EventFrame>,
122}
123
124impl ExtensionResponse {
125    pub fn new(payload: Vec<u8>) -> Self {
126        Self {
127            payload,
128            events: Vec::new(),
129        }
130    }
131
132    pub fn with_events(payload: Vec<u8>, events: Vec<EventFrame>) -> Self {
133        Self { payload, events }
134    }
135
136    pub fn with_wire_events(
137        payload: Vec<u8>,
138        events: Vec<crate::wire::EventFrame>,
139    ) -> Result<Self, SidecarError> {
140        let events = events
141            .into_iter()
142            .map(crate::wire::event_frame_to_compat)
143            .collect::<Result<Vec<_>, _>>()
144            .map_err(wire_protocol_error)?;
145        Ok(Self { payload, events })
146    }
147}
148
149#[derive(Clone)]
150pub struct ExtensionSnapshot {
151    namespace: String,
152    ownership: OwnershipScope,
153    sidecar_requests: SharedSidecarRequestClient,
154}
155
156pub struct ExtensionContext<'a> {
157    snapshot: ExtensionSnapshot,
158    host: &'a mut dyn ExtensionHost,
159}
160
161impl ExtensionSnapshot {
162    pub(crate) fn new(
163        namespace: String,
164        ownership: OwnershipScope,
165        sidecar_requests: SharedSidecarRequestClient,
166    ) -> Self {
167        Self {
168            namespace,
169            ownership,
170            sidecar_requests,
171        }
172    }
173
174    pub fn namespace(&self) -> &str {
175        &self.namespace
176    }
177
178    pub fn ownership(&self) -> &OwnershipScope {
179        &self.ownership
180    }
181
182    pub fn ext_event(&self, payload: Vec<u8>) -> EventFrame {
183        EventFrame::new(
184            self.ownership.clone(),
185            EventPayload::Ext(ExtEnvelope {
186                namespace: self.namespace.clone(),
187                payload,
188            }),
189        )
190    }
191
192    pub fn ext_event_wire(
193        &self,
194        payload: Vec<u8>,
195    ) -> Result<crate::wire::EventFrame, SidecarError> {
196        crate::wire::event_frame_from_compat(self.ext_event(payload)).map_err(wire_protocol_error)
197    }
198
199    pub fn invoke_callback(
200        &self,
201        payload: Vec<u8>,
202        timeout: Duration,
203    ) -> Result<Vec<u8>, SidecarError> {
204        let response = self.sidecar_requests.invoke(
205            self.ownership.clone(),
206            SidecarRequestPayload::Ext(ExtEnvelope {
207                namespace: self.namespace.clone(),
208                payload,
209            }),
210            timeout,
211        )?;
212        extension_callback_response_payload(&self.namespace, response)
213    }
214}
215
216impl<'a> ExtensionContext<'a> {
217    pub(crate) fn new(snapshot: ExtensionSnapshot, host: &'a mut dyn ExtensionHost) -> Self {
218        Self { snapshot, host }
219    }
220
221    pub fn snapshot(&self) -> ExtensionSnapshot {
222        self.snapshot.clone()
223    }
224
225    pub fn namespace(&self) -> &str {
226        self.snapshot.namespace()
227    }
228
229    pub fn ownership(&self) -> &OwnershipScope {
230        self.snapshot.ownership()
231    }
232
233    pub fn ext_event(&self, payload: Vec<u8>) -> EventFrame {
234        self.snapshot.ext_event(payload)
235    }
236
237    pub fn ext_event_wire(
238        &self,
239        payload: Vec<u8>,
240    ) -> Result<crate::wire::EventFrame, SidecarError> {
241        self.snapshot.ext_event_wire(payload)
242    }
243
244    pub fn invoke_callback(
245        &self,
246        payload: Vec<u8>,
247        timeout: Duration,
248    ) -> Result<Vec<u8>, SidecarError> {
249        self.snapshot.invoke_callback(payload, timeout)
250    }
251
252    pub async fn spawn_process(
253        &mut self,
254        request: ExecuteRequest,
255    ) -> Result<ProcessStartedResponse, SidecarError> {
256        self.host
257            .spawn_process(self.snapshot.ownership.clone(), request)
258            .await
259    }
260
261    pub async fn spawn_process_wire(
262        &mut self,
263        request: crate::wire::ExecuteRequest,
264    ) -> Result<crate::wire::ProcessStartedResponse, SidecarError> {
265        let payload = crate::wire::request_payload_to_compat(
266            self.snapshot.ownership(),
267            crate::wire::RequestPayload::ExecuteRequest(request),
268        )
269        .map_err(wire_protocol_error)?;
270        let crate::protocol::RequestPayload::Execute(request) = payload else {
271            return Err(unexpected_wire_request_payload("execute"));
272        };
273        let response = self.spawn_process(request).await?;
274        let payload = crate::wire::response_payload_from_compat(
275            self.snapshot.ownership(),
276            crate::protocol::ResponsePayload::ProcessStarted(response),
277        )
278        .map_err(wire_protocol_error)?;
279        let crate::wire::ResponsePayload::ProcessStartedResponse(response) = payload else {
280            return Err(unexpected_wire_response_payload("process started"));
281        };
282        Ok(response)
283    }
284
285    pub async fn write_stdin(
286        &mut self,
287        request: WriteStdinRequest,
288    ) -> Result<StdinWrittenResponse, SidecarError> {
289        self.host
290            .write_stdin(self.snapshot.ownership.clone(), request)
291            .await
292    }
293
294    pub async fn write_stdin_wire(
295        &mut self,
296        request: crate::wire::WriteStdinRequest,
297    ) -> Result<crate::wire::StdinWrittenResponse, SidecarError> {
298        let payload = crate::wire::request_payload_to_compat(
299            self.snapshot.ownership(),
300            crate::wire::RequestPayload::WriteStdinRequest(request),
301        )
302        .map_err(wire_protocol_error)?;
303        let crate::protocol::RequestPayload::WriteStdin(request) = payload else {
304            return Err(unexpected_wire_request_payload("write stdin"));
305        };
306        let response = self.write_stdin(request).await?;
307        let payload = crate::wire::response_payload_from_compat(
308            self.snapshot.ownership(),
309            crate::protocol::ResponsePayload::StdinWritten(response),
310        )
311        .map_err(wire_protocol_error)?;
312        let crate::wire::ResponsePayload::StdinWrittenResponse(response) = payload else {
313            return Err(unexpected_wire_response_payload("stdin written"));
314        };
315        Ok(response)
316    }
317
318    pub async fn close_stdin(
319        &mut self,
320        request: CloseStdinRequest,
321    ) -> Result<StdinClosedResponse, SidecarError> {
322        self.host
323            .close_stdin(self.snapshot.ownership.clone(), request)
324            .await
325    }
326
327    pub async fn close_stdin_wire(
328        &mut self,
329        request: crate::wire::CloseStdinRequest,
330    ) -> Result<crate::wire::StdinClosedResponse, SidecarError> {
331        let payload = crate::wire::request_payload_to_compat(
332            self.snapshot.ownership(),
333            crate::wire::RequestPayload::CloseStdinRequest(request),
334        )
335        .map_err(wire_protocol_error)?;
336        let crate::protocol::RequestPayload::CloseStdin(request) = payload else {
337            return Err(unexpected_wire_request_payload("close stdin"));
338        };
339        let response = self.close_stdin(request).await?;
340        let payload = crate::wire::response_payload_from_compat(
341            self.snapshot.ownership(),
342            crate::protocol::ResponsePayload::StdinClosed(response),
343        )
344        .map_err(wire_protocol_error)?;
345        let crate::wire::ResponsePayload::StdinClosedResponse(response) = payload else {
346            return Err(unexpected_wire_response_payload("stdin closed"));
347        };
348        Ok(response)
349    }
350
351    pub async fn kill_process(
352        &mut self,
353        request: KillProcessRequest,
354    ) -> Result<ProcessKilledResponse, SidecarError> {
355        self.host
356            .kill_process(self.snapshot.ownership.clone(), request)
357            .await
358    }
359
360    pub async fn kill_process_wire(
361        &mut self,
362        request: crate::wire::KillProcessRequest,
363    ) -> Result<crate::wire::ProcessKilledResponse, SidecarError> {
364        let payload = crate::wire::request_payload_to_compat(
365            self.snapshot.ownership(),
366            crate::wire::RequestPayload::KillProcessRequest(request),
367        )
368        .map_err(wire_protocol_error)?;
369        let crate::protocol::RequestPayload::KillProcess(request) = payload else {
370            return Err(unexpected_wire_request_payload("kill process"));
371        };
372        let response = self.kill_process(request).await?;
373        let payload = crate::wire::response_payload_from_compat(
374            self.snapshot.ownership(),
375            crate::protocol::ResponsePayload::ProcessKilled(response),
376        )
377        .map_err(wire_protocol_error)?;
378        let crate::wire::ResponsePayload::ProcessKilledResponse(response) = payload else {
379            return Err(unexpected_wire_response_payload("process killed"));
380        };
381        Ok(response)
382    }
383
384    pub async fn poll_event(
385        &mut self,
386        timeout: Duration,
387    ) -> Result<Option<EventFrame>, SidecarError> {
388        self.host
389            .poll_event(self.snapshot.ownership.clone(), timeout)
390            .await
391    }
392
393    pub async fn poll_event_wire(
394        &mut self,
395        timeout: Duration,
396    ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
397        self.poll_event(timeout)
398            .await?
399            .map(crate::wire::event_frame_from_compat)
400            .transpose()
401            .map_err(wire_protocol_error)
402    }
403
404    pub async fn guest_filesystem_call(
405        &mut self,
406        request: GuestFilesystemCallRequest,
407    ) -> Result<GuestFilesystemResultResponse, SidecarError> {
408        self.host
409            .guest_filesystem_call(self.snapshot.ownership.clone(), request)
410            .await
411    }
412
413    pub async fn guest_filesystem_call_wire(
414        &mut self,
415        request: crate::wire::GuestFilesystemCallRequest,
416    ) -> Result<crate::wire::GuestFilesystemResultResponse, SidecarError> {
417        let payload = crate::wire::request_payload_to_compat(
418            self.snapshot.ownership(),
419            crate::wire::RequestPayload::GuestFilesystemCallRequest(request),
420        )
421        .map_err(wire_protocol_error)?;
422        let crate::protocol::RequestPayload::GuestFilesystemCall(request) = payload else {
423            return Err(unexpected_wire_request_payload("guest filesystem call"));
424        };
425        let response = self.guest_filesystem_call(request).await?;
426        let payload = crate::wire::response_payload_from_compat(
427            self.snapshot.ownership(),
428            crate::protocol::ResponsePayload::GuestFilesystemResult(response),
429        )
430        .map_err(wire_protocol_error)?;
431        let crate::wire::ResponsePayload::GuestFilesystemResultResponse(response) = payload else {
432            return Err(unexpected_wire_response_payload("guest filesystem result"));
433        };
434        Ok(response)
435    }
436
437    pub async fn bind_process_to_session(
438        &mut self,
439        ext_session_id: impl Into<String>,
440        process_id: impl Into<String>,
441    ) -> Result<(), SidecarError> {
442        self.host
443            .bind_process_to_session(
444                self.snapshot.ownership.clone(),
445                self.snapshot.namespace.clone(),
446                ext_session_id.into(),
447                process_id.into(),
448            )
449            .await
450    }
451
452    pub async fn bind_vm_to_session(
453        &mut self,
454        ext_session_id: impl Into<String>,
455    ) -> Result<(), SidecarError> {
456        self.host
457            .bind_vm_to_session(
458                self.snapshot.ownership.clone(),
459                self.snapshot.namespace.clone(),
460                ext_session_id.into(),
461            )
462            .await
463    }
464
465    pub async fn dispose_session_resources(
466        &mut self,
467        ext_session_id: impl Into<String>,
468    ) -> Result<Vec<EventFrame>, SidecarError> {
469        self.host
470            .dispose_session_resources(
471                self.snapshot.ownership.clone(),
472                self.snapshot.namespace.clone(),
473                ext_session_id.into(),
474            )
475            .await
476    }
477
478    pub async fn dispose_session_resources_wire(
479        &mut self,
480        ext_session_id: impl Into<String>,
481    ) -> Result<Vec<crate::wire::EventFrame>, SidecarError> {
482        self.dispose_session_resources(ext_session_id)
483            .await?
484            .into_iter()
485            .map(crate::wire::event_frame_from_compat)
486            .collect::<Result<Vec<_>, _>>()
487            .map_err(wire_protocol_error)
488    }
489
490    pub async fn start_buffering_process_output(
491        &mut self,
492        process_id: impl Into<String>,
493    ) -> Result<(), SidecarError> {
494        self.host
495            .start_buffering_process_output(self.snapshot.ownership.clone(), process_id.into())
496            .await
497    }
498
499    pub async fn handoff_buffered_process_output(
500        &mut self,
501        ext_session_id: impl Into<String>,
502        process_id: impl Into<String>,
503        timeout: Duration,
504    ) -> Result<ExtensionBufferedProcessOutput, SidecarError> {
505        self.host
506            .handoff_buffered_process_output(
507                self.snapshot.ownership.clone(),
508                self.snapshot.namespace.clone(),
509                ext_session_id.into(),
510                process_id.into(),
511                timeout,
512            )
513            .await
514    }
515}
516
517fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
518    SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
519}
520
521fn unexpected_wire_request_payload(operation: &str) -> SidecarError {
522    SidecarError::InvalidState(format!(
523        "generated wire {operation} request converted to the wrong compatibility payload"
524    ))
525}
526
527fn unexpected_wire_response_payload(operation: &str) -> SidecarError {
528    SidecarError::InvalidState(format!(
529        "compatibility {operation} response converted to the wrong generated wire payload"
530    ))
531}
532
533fn extension_callback_response_payload(
534    namespace: &str,
535    response: SidecarResponsePayload,
536) -> Result<Vec<u8>, SidecarError> {
537    match response {
538        SidecarResponsePayload::ExtResult(envelope) if envelope.namespace == namespace => {
539            Ok(envelope.payload)
540        }
541        SidecarResponsePayload::ExtResult(envelope) => Err(SidecarError::InvalidState(format!(
542            "extension callback response namespace {} did not match {}",
543            envelope.namespace, namespace
544        ))),
545        SidecarResponsePayload::HostCallbackResult(_)
546        | SidecarResponsePayload::JsBridgeResult(_) => Err(SidecarError::InvalidState(
547            String::from("extension callback received a non-extension response"),
548        )),
549    }
550}
551
552pub enum ExtensionInterruptRequest<'a> {
553    ExtensionPayload(&'a [u8]),
554    KillProcess,
555}
556
557#[derive(Debug, Clone)]
558pub struct ExtensionInterruptResponse {
559    pub interrupted_response_payload: Vec<u8>,
560    pub interrupting_response_payload: Option<Vec<u8>>,
561}
562
563pub trait Extension: Send + Sync {
564    fn namespace(&self) -> &str;
565
566    fn handle_request<'a>(
567        &'a self,
568        ctx: ExtensionContext<'a>,
569        payload: Vec<u8>,
570    ) -> ExtensionFuture<'a, ExtensionResponse>;
571
572    fn on_vm_created<'a>(&'a self, _ctx: ExtensionSnapshot) -> ExtensionFuture<'a, ()> {
573        Box::pin(async { Ok(()) })
574    }
575
576    fn is_blocking_request(&self, _payload: &[u8]) -> bool {
577        false
578    }
579
580    fn interrupt_blocking_request(
581        &self,
582        _blocking_payload: &[u8],
583        _interrupt: ExtensionInterruptRequest<'_>,
584    ) -> Option<ExtensionInterruptResponse> {
585        None
586    }
587
588    fn on_dispose<'a>(&'a self) -> ExtensionFuture<'a, ()> {
589        Box::pin(async { Ok(()) })
590    }
591}