1use std::future::Future;
2use std::pin::Pin;
3use std::time::Duration;
4
5use crate::protocol::{
6 CloseStdinRequest, EventFrame, EventPayload, ExecuteRequest, ExtEnvelope,
7 GuestFilesystemCallRequest, GuestFilesystemResultResponse, KillProcessRequest, OwnershipScope,
8 ProcessKilledResponse, ProcessStartedResponse, SidecarRequestPayload, SidecarResponsePayload,
9 StdinClosedResponse, StdinWrittenResponse, WriteStdinRequest,
10};
11use crate::state::{SharedSidecarRequestClient, SidecarError};
12
13pub type ExtensionFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, SidecarError>> + 'a>>;
14
15pub trait ExtensionHost {
16 fn spawn_process<'a>(
17 &'a mut self,
18 ownership: OwnershipScope,
19 request: ExecuteRequest,
20 ) -> ExtensionFuture<'a, ProcessStartedResponse>;
21
22 fn write_stdin<'a>(
23 &'a mut self,
24 ownership: OwnershipScope,
25 request: WriteStdinRequest,
26 ) -> ExtensionFuture<'a, StdinWrittenResponse>;
27
28 fn close_stdin<'a>(
29 &'a mut self,
30 ownership: OwnershipScope,
31 request: CloseStdinRequest,
32 ) -> ExtensionFuture<'a, StdinClosedResponse>;
33
34 fn kill_process<'a>(
35 &'a mut self,
36 ownership: OwnershipScope,
37 request: KillProcessRequest,
38 ) -> ExtensionFuture<'a, ProcessKilledResponse>;
39
40 fn poll_event<'a>(
41 &'a mut self,
42 ownership: OwnershipScope,
43 timeout: Duration,
44 ) -> ExtensionFuture<'a, Option<EventFrame>>;
45
46 fn guest_filesystem_call<'a>(
47 &'a mut self,
48 ownership: OwnershipScope,
49 request: GuestFilesystemCallRequest,
50 ) -> ExtensionFuture<'a, GuestFilesystemResultResponse>;
51
52 fn bind_process_to_session<'a>(
53 &'a mut self,
54 ownership: OwnershipScope,
55 namespace: String,
56 ext_session_id: String,
57 process_id: String,
58 ) -> ExtensionFuture<'a, ()>;
59
60 fn bind_vm_to_session<'a>(
61 &'a mut self,
62 ownership: OwnershipScope,
63 namespace: String,
64 ext_session_id: String,
65 ) -> ExtensionFuture<'a, ()>;
66
67 fn dispose_session_resources<'a>(
68 &'a mut self,
69 ownership: OwnershipScope,
70 namespace: String,
71 ext_session_id: String,
72 ) -> ExtensionFuture<'a, Vec<EventFrame>>;
73
74 fn start_buffering_process_output<'a>(
75 &'a mut self,
76 ownership: OwnershipScope,
77 process_id: String,
78 ) -> ExtensionFuture<'a, ()>;
79
80 fn handoff_buffered_process_output<'a>(
81 &'a mut self,
82 ownership: OwnershipScope,
83 namespace: String,
84 ext_session_id: String,
85 process_id: String,
86 timeout: Duration,
87 ) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput>;
88}
89
90#[derive(Debug, Clone, Default, PartialEq, Eq)]
91pub struct ExtensionBufferedProcessOutput {
92 pub stdout: Vec<u8>,
93 pub stderr: Vec<u8>,
94 pub stdout_truncated: bool,
95 pub stderr_truncated: bool,
96}
97
98impl ExtensionBufferedProcessOutput {
99 pub(crate) fn append_stdout(&mut self, chunk: &[u8], cap: usize) {
100 self.stdout_truncated |= append_bounded_bytes(&mut self.stdout, chunk, cap);
101 }
102
103 pub(crate) fn append_stderr(&mut self, chunk: &[u8], cap: usize) {
104 self.stderr_truncated |= append_bounded_bytes(&mut self.stderr, chunk, cap);
105 }
106}
107
108fn append_bounded_bytes(buffer: &mut Vec<u8>, chunk: &[u8], cap: usize) -> bool {
109 buffer.extend_from_slice(chunk);
110 if buffer.len() <= cap {
111 return false;
112 }
113 let remove_len = buffer.len() - cap;
114 buffer.drain(..remove_len);
115 true
116}
117
118#[derive(Debug, Clone)]
119pub struct ExtensionResponse {
120 pub payload: Vec<u8>,
121 pub events: Vec<EventFrame>,
122}
123
124impl ExtensionResponse {
125 pub fn new(payload: Vec<u8>) -> Self {
126 Self {
127 payload,
128 events: Vec::new(),
129 }
130 }
131
132 pub fn with_events(payload: Vec<u8>, events: Vec<EventFrame>) -> Self {
133 Self { payload, events }
134 }
135
136 pub fn with_wire_events(
137 payload: Vec<u8>,
138 events: Vec<crate::wire::EventFrame>,
139 ) -> Result<Self, SidecarError> {
140 let events = events
141 .into_iter()
142 .map(crate::wire::event_frame_to_compat)
143 .collect::<Result<Vec<_>, _>>()
144 .map_err(wire_protocol_error)?;
145 Ok(Self { payload, events })
146 }
147}
148
149#[derive(Clone)]
150pub struct ExtensionSnapshot {
151 namespace: String,
152 ownership: OwnershipScope,
153 sidecar_requests: SharedSidecarRequestClient,
154}
155
156pub struct ExtensionContext<'a> {
157 snapshot: ExtensionSnapshot,
158 host: &'a mut dyn ExtensionHost,
159}
160
161impl ExtensionSnapshot {
162 pub(crate) fn new(
163 namespace: String,
164 ownership: OwnershipScope,
165 sidecar_requests: SharedSidecarRequestClient,
166 ) -> Self {
167 Self {
168 namespace,
169 ownership,
170 sidecar_requests,
171 }
172 }
173
174 pub fn namespace(&self) -> &str {
175 &self.namespace
176 }
177
178 pub fn ownership(&self) -> &OwnershipScope {
179 &self.ownership
180 }
181
182 pub fn ext_event(&self, payload: Vec<u8>) -> EventFrame {
183 EventFrame::new(
184 self.ownership.clone(),
185 EventPayload::Ext(ExtEnvelope {
186 namespace: self.namespace.clone(),
187 payload,
188 }),
189 )
190 }
191
192 pub fn ext_event_wire(
193 &self,
194 payload: Vec<u8>,
195 ) -> Result<crate::wire::EventFrame, SidecarError> {
196 crate::wire::event_frame_from_compat(self.ext_event(payload)).map_err(wire_protocol_error)
197 }
198
199 pub fn invoke_callback(
200 &self,
201 payload: Vec<u8>,
202 timeout: Duration,
203 ) -> Result<Vec<u8>, SidecarError> {
204 let response = self.sidecar_requests.invoke(
205 self.ownership.clone(),
206 SidecarRequestPayload::Ext(ExtEnvelope {
207 namespace: self.namespace.clone(),
208 payload,
209 }),
210 timeout,
211 )?;
212 extension_callback_response_payload(&self.namespace, response)
213 }
214}
215
216impl<'a> ExtensionContext<'a> {
217 pub(crate) fn new(snapshot: ExtensionSnapshot, host: &'a mut dyn ExtensionHost) -> Self {
218 Self { snapshot, host }
219 }
220
221 pub fn snapshot(&self) -> ExtensionSnapshot {
222 self.snapshot.clone()
223 }
224
225 pub fn namespace(&self) -> &str {
226 self.snapshot.namespace()
227 }
228
229 pub fn ownership(&self) -> &OwnershipScope {
230 self.snapshot.ownership()
231 }
232
233 pub fn ext_event(&self, payload: Vec<u8>) -> EventFrame {
234 self.snapshot.ext_event(payload)
235 }
236
237 pub fn ext_event_wire(
238 &self,
239 payload: Vec<u8>,
240 ) -> Result<crate::wire::EventFrame, SidecarError> {
241 self.snapshot.ext_event_wire(payload)
242 }
243
244 pub fn invoke_callback(
245 &self,
246 payload: Vec<u8>,
247 timeout: Duration,
248 ) -> Result<Vec<u8>, SidecarError> {
249 self.snapshot.invoke_callback(payload, timeout)
250 }
251
252 pub async fn spawn_process(
253 &mut self,
254 request: ExecuteRequest,
255 ) -> Result<ProcessStartedResponse, SidecarError> {
256 self.host
257 .spawn_process(self.snapshot.ownership.clone(), request)
258 .await
259 }
260
261 pub async fn spawn_process_wire(
262 &mut self,
263 request: crate::wire::ExecuteRequest,
264 ) -> Result<crate::wire::ProcessStartedResponse, SidecarError> {
265 let payload = crate::wire::request_payload_to_compat(
266 self.snapshot.ownership(),
267 crate::wire::RequestPayload::ExecuteRequest(request),
268 )
269 .map_err(wire_protocol_error)?;
270 let crate::protocol::RequestPayload::Execute(request) = payload else {
271 return Err(unexpected_wire_request_payload("execute"));
272 };
273 let response = self.spawn_process(request).await?;
274 let payload = crate::wire::response_payload_from_compat(
275 self.snapshot.ownership(),
276 crate::protocol::ResponsePayload::ProcessStarted(response),
277 )
278 .map_err(wire_protocol_error)?;
279 let crate::wire::ResponsePayload::ProcessStartedResponse(response) = payload else {
280 return Err(unexpected_wire_response_payload("process started"));
281 };
282 Ok(response)
283 }
284
285 pub async fn write_stdin(
286 &mut self,
287 request: WriteStdinRequest,
288 ) -> Result<StdinWrittenResponse, SidecarError> {
289 self.host
290 .write_stdin(self.snapshot.ownership.clone(), request)
291 .await
292 }
293
294 pub async fn write_stdin_wire(
295 &mut self,
296 request: crate::wire::WriteStdinRequest,
297 ) -> Result<crate::wire::StdinWrittenResponse, SidecarError> {
298 let payload = crate::wire::request_payload_to_compat(
299 self.snapshot.ownership(),
300 crate::wire::RequestPayload::WriteStdinRequest(request),
301 )
302 .map_err(wire_protocol_error)?;
303 let crate::protocol::RequestPayload::WriteStdin(request) = payload else {
304 return Err(unexpected_wire_request_payload("write stdin"));
305 };
306 let response = self.write_stdin(request).await?;
307 let payload = crate::wire::response_payload_from_compat(
308 self.snapshot.ownership(),
309 crate::protocol::ResponsePayload::StdinWritten(response),
310 )
311 .map_err(wire_protocol_error)?;
312 let crate::wire::ResponsePayload::StdinWrittenResponse(response) = payload else {
313 return Err(unexpected_wire_response_payload("stdin written"));
314 };
315 Ok(response)
316 }
317
318 pub async fn close_stdin(
319 &mut self,
320 request: CloseStdinRequest,
321 ) -> Result<StdinClosedResponse, SidecarError> {
322 self.host
323 .close_stdin(self.snapshot.ownership.clone(), request)
324 .await
325 }
326
327 pub async fn close_stdin_wire(
328 &mut self,
329 request: crate::wire::CloseStdinRequest,
330 ) -> Result<crate::wire::StdinClosedResponse, SidecarError> {
331 let payload = crate::wire::request_payload_to_compat(
332 self.snapshot.ownership(),
333 crate::wire::RequestPayload::CloseStdinRequest(request),
334 )
335 .map_err(wire_protocol_error)?;
336 let crate::protocol::RequestPayload::CloseStdin(request) = payload else {
337 return Err(unexpected_wire_request_payload("close stdin"));
338 };
339 let response = self.close_stdin(request).await?;
340 let payload = crate::wire::response_payload_from_compat(
341 self.snapshot.ownership(),
342 crate::protocol::ResponsePayload::StdinClosed(response),
343 )
344 .map_err(wire_protocol_error)?;
345 let crate::wire::ResponsePayload::StdinClosedResponse(response) = payload else {
346 return Err(unexpected_wire_response_payload("stdin closed"));
347 };
348 Ok(response)
349 }
350
351 pub async fn kill_process(
352 &mut self,
353 request: KillProcessRequest,
354 ) -> Result<ProcessKilledResponse, SidecarError> {
355 self.host
356 .kill_process(self.snapshot.ownership.clone(), request)
357 .await
358 }
359
360 pub async fn kill_process_wire(
361 &mut self,
362 request: crate::wire::KillProcessRequest,
363 ) -> Result<crate::wire::ProcessKilledResponse, SidecarError> {
364 let payload = crate::wire::request_payload_to_compat(
365 self.snapshot.ownership(),
366 crate::wire::RequestPayload::KillProcessRequest(request),
367 )
368 .map_err(wire_protocol_error)?;
369 let crate::protocol::RequestPayload::KillProcess(request) = payload else {
370 return Err(unexpected_wire_request_payload("kill process"));
371 };
372 let response = self.kill_process(request).await?;
373 let payload = crate::wire::response_payload_from_compat(
374 self.snapshot.ownership(),
375 crate::protocol::ResponsePayload::ProcessKilled(response),
376 )
377 .map_err(wire_protocol_error)?;
378 let crate::wire::ResponsePayload::ProcessKilledResponse(response) = payload else {
379 return Err(unexpected_wire_response_payload("process killed"));
380 };
381 Ok(response)
382 }
383
384 pub async fn poll_event(
385 &mut self,
386 timeout: Duration,
387 ) -> Result<Option<EventFrame>, SidecarError> {
388 self.host
389 .poll_event(self.snapshot.ownership.clone(), timeout)
390 .await
391 }
392
393 pub async fn poll_event_wire(
394 &mut self,
395 timeout: Duration,
396 ) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
397 self.poll_event(timeout)
398 .await?
399 .map(crate::wire::event_frame_from_compat)
400 .transpose()
401 .map_err(wire_protocol_error)
402 }
403
404 pub async fn guest_filesystem_call(
405 &mut self,
406 request: GuestFilesystemCallRequest,
407 ) -> Result<GuestFilesystemResultResponse, SidecarError> {
408 self.host
409 .guest_filesystem_call(self.snapshot.ownership.clone(), request)
410 .await
411 }
412
413 pub async fn guest_filesystem_call_wire(
414 &mut self,
415 request: crate::wire::GuestFilesystemCallRequest,
416 ) -> Result<crate::wire::GuestFilesystemResultResponse, SidecarError> {
417 let payload = crate::wire::request_payload_to_compat(
418 self.snapshot.ownership(),
419 crate::wire::RequestPayload::GuestFilesystemCallRequest(request),
420 )
421 .map_err(wire_protocol_error)?;
422 let crate::protocol::RequestPayload::GuestFilesystemCall(request) = payload else {
423 return Err(unexpected_wire_request_payload("guest filesystem call"));
424 };
425 let response = self.guest_filesystem_call(request).await?;
426 let payload = crate::wire::response_payload_from_compat(
427 self.snapshot.ownership(),
428 crate::protocol::ResponsePayload::GuestFilesystemResult(response),
429 )
430 .map_err(wire_protocol_error)?;
431 let crate::wire::ResponsePayload::GuestFilesystemResultResponse(response) = payload else {
432 return Err(unexpected_wire_response_payload("guest filesystem result"));
433 };
434 Ok(response)
435 }
436
437 pub async fn bind_process_to_session(
438 &mut self,
439 ext_session_id: impl Into<String>,
440 process_id: impl Into<String>,
441 ) -> Result<(), SidecarError> {
442 self.host
443 .bind_process_to_session(
444 self.snapshot.ownership.clone(),
445 self.snapshot.namespace.clone(),
446 ext_session_id.into(),
447 process_id.into(),
448 )
449 .await
450 }
451
452 pub async fn bind_vm_to_session(
453 &mut self,
454 ext_session_id: impl Into<String>,
455 ) -> Result<(), SidecarError> {
456 self.host
457 .bind_vm_to_session(
458 self.snapshot.ownership.clone(),
459 self.snapshot.namespace.clone(),
460 ext_session_id.into(),
461 )
462 .await
463 }
464
465 pub async fn dispose_session_resources(
466 &mut self,
467 ext_session_id: impl Into<String>,
468 ) -> Result<Vec<EventFrame>, SidecarError> {
469 self.host
470 .dispose_session_resources(
471 self.snapshot.ownership.clone(),
472 self.snapshot.namespace.clone(),
473 ext_session_id.into(),
474 )
475 .await
476 }
477
478 pub async fn dispose_session_resources_wire(
479 &mut self,
480 ext_session_id: impl Into<String>,
481 ) -> Result<Vec<crate::wire::EventFrame>, SidecarError> {
482 self.dispose_session_resources(ext_session_id)
483 .await?
484 .into_iter()
485 .map(crate::wire::event_frame_from_compat)
486 .collect::<Result<Vec<_>, _>>()
487 .map_err(wire_protocol_error)
488 }
489
490 pub async fn start_buffering_process_output(
491 &mut self,
492 process_id: impl Into<String>,
493 ) -> Result<(), SidecarError> {
494 self.host
495 .start_buffering_process_output(self.snapshot.ownership.clone(), process_id.into())
496 .await
497 }
498
499 pub async fn handoff_buffered_process_output(
500 &mut self,
501 ext_session_id: impl Into<String>,
502 process_id: impl Into<String>,
503 timeout: Duration,
504 ) -> Result<ExtensionBufferedProcessOutput, SidecarError> {
505 self.host
506 .handoff_buffered_process_output(
507 self.snapshot.ownership.clone(),
508 self.snapshot.namespace.clone(),
509 ext_session_id.into(),
510 process_id.into(),
511 timeout,
512 )
513 .await
514 }
515}
516
517fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
518 SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
519}
520
521fn unexpected_wire_request_payload(operation: &str) -> SidecarError {
522 SidecarError::InvalidState(format!(
523 "generated wire {operation} request converted to the wrong compatibility payload"
524 ))
525}
526
527fn unexpected_wire_response_payload(operation: &str) -> SidecarError {
528 SidecarError::InvalidState(format!(
529 "compatibility {operation} response converted to the wrong generated wire payload"
530 ))
531}
532
533fn extension_callback_response_payload(
534 namespace: &str,
535 response: SidecarResponsePayload,
536) -> Result<Vec<u8>, SidecarError> {
537 match response {
538 SidecarResponsePayload::ExtResult(envelope) if envelope.namespace == namespace => {
539 Ok(envelope.payload)
540 }
541 SidecarResponsePayload::ExtResult(envelope) => Err(SidecarError::InvalidState(format!(
542 "extension callback response namespace {} did not match {}",
543 envelope.namespace, namespace
544 ))),
545 SidecarResponsePayload::HostCallbackResult(_)
546 | SidecarResponsePayload::JsBridgeResult(_) => Err(SidecarError::InvalidState(
547 String::from("extension callback received a non-extension response"),
548 )),
549 }
550}
551
552pub enum ExtensionInterruptRequest<'a> {
553 ExtensionPayload(&'a [u8]),
554 KillProcess,
555}
556
557#[derive(Debug, Clone)]
558pub struct ExtensionInterruptResponse {
559 pub interrupted_response_payload: Vec<u8>,
560 pub interrupting_response_payload: Option<Vec<u8>>,
561}
562
563pub trait Extension: Send + Sync {
564 fn namespace(&self) -> &str;
565
566 fn handle_request<'a>(
567 &'a self,
568 ctx: ExtensionContext<'a>,
569 payload: Vec<u8>,
570 ) -> ExtensionFuture<'a, ExtensionResponse>;
571
572 fn on_vm_created<'a>(&'a self, _ctx: ExtensionSnapshot) -> ExtensionFuture<'a, ()> {
573 Box::pin(async { Ok(()) })
574 }
575
576 fn is_blocking_request(&self, _payload: &[u8]) -> bool {
577 false
578 }
579
580 fn interrupt_blocking_request(
581 &self,
582 _blocking_payload: &[u8],
583 _interrupt: ExtensionInterruptRequest<'_>,
584 ) -> Option<ExtensionInterruptResponse> {
585 None
586 }
587
588 fn on_dispose<'a>(&'a self) -> ExtensionFuture<'a, ()> {
589 Box::pin(async { Ok(()) })
590 }
591}