use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use crate::protocol::{
CloseStdinRequest, EventFrame, EventPayload, ExecuteRequest, ExtEnvelope,
GuestFilesystemCallRequest, GuestFilesystemResultResponse, KillProcessRequest, OwnershipScope,
ProcessKilledResponse, ProcessStartedResponse, SidecarRequestPayload, SidecarResponsePayload,
StdinClosedResponse, StdinWrittenResponse, WriteStdinRequest,
};
use crate::state::{SharedSidecarRequestClient, SidecarError};
pub type ExtensionFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, SidecarError>> + 'a>>;
pub trait ExtensionHost {
fn spawn_process<'a>(
&'a mut self,
ownership: OwnershipScope,
request: ExecuteRequest,
) -> ExtensionFuture<'a, ProcessStartedResponse>;
fn write_stdin<'a>(
&'a mut self,
ownership: OwnershipScope,
request: WriteStdinRequest,
) -> ExtensionFuture<'a, StdinWrittenResponse>;
fn close_stdin<'a>(
&'a mut self,
ownership: OwnershipScope,
request: CloseStdinRequest,
) -> ExtensionFuture<'a, StdinClosedResponse>;
fn kill_process<'a>(
&'a mut self,
ownership: OwnershipScope,
request: KillProcessRequest,
) -> ExtensionFuture<'a, ProcessKilledResponse>;
fn poll_event<'a>(
&'a mut self,
ownership: OwnershipScope,
timeout: Duration,
) -> ExtensionFuture<'a, Option<EventFrame>>;
fn guest_filesystem_call<'a>(
&'a mut self,
ownership: OwnershipScope,
request: GuestFilesystemCallRequest,
) -> ExtensionFuture<'a, GuestFilesystemResultResponse>;
fn bind_process_to_session<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
process_id: String,
) -> ExtensionFuture<'a, ()>;
fn bind_vm_to_session<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
) -> ExtensionFuture<'a, ()>;
fn dispose_session_resources<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
) -> ExtensionFuture<'a, Vec<EventFrame>>;
fn start_buffering_process_output<'a>(
&'a mut self,
ownership: OwnershipScope,
process_id: String,
) -> ExtensionFuture<'a, ()>;
fn handoff_buffered_process_output<'a>(
&'a mut self,
ownership: OwnershipScope,
namespace: String,
ext_session_id: String,
process_id: String,
timeout: Duration,
) -> ExtensionFuture<'a, ExtensionBufferedProcessOutput>;
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ExtensionBufferedProcessOutput {
pub stdout: Vec<u8>,
pub stderr: Vec<u8>,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
}
impl ExtensionBufferedProcessOutput {
pub(crate) fn append_stdout(&mut self, chunk: &[u8], cap: usize) {
self.stdout_truncated |= append_bounded_bytes(&mut self.stdout, chunk, cap);
}
pub(crate) fn append_stderr(&mut self, chunk: &[u8], cap: usize) {
self.stderr_truncated |= append_bounded_bytes(&mut self.stderr, chunk, cap);
}
}
fn append_bounded_bytes(buffer: &mut Vec<u8>, chunk: &[u8], cap: usize) -> bool {
buffer.extend_from_slice(chunk);
if buffer.len() <= cap {
return false;
}
let remove_len = buffer.len() - cap;
buffer.drain(..remove_len);
true
}
#[derive(Debug, Clone)]
pub struct ExtensionResponse {
pub payload: Vec<u8>,
pub events: Vec<EventFrame>,
}
impl ExtensionResponse {
pub fn new(payload: Vec<u8>) -> Self {
Self {
payload,
events: Vec::new(),
}
}
pub fn with_events(payload: Vec<u8>, events: Vec<EventFrame>) -> Self {
Self { payload, events }
}
pub fn with_wire_events(
payload: Vec<u8>,
events: Vec<crate::wire::EventFrame>,
) -> Result<Self, SidecarError> {
let events = events
.into_iter()
.map(crate::wire::event_frame_to_compat)
.collect::<Result<Vec<_>, _>>()
.map_err(wire_protocol_error)?;
Ok(Self { payload, events })
}
}
#[derive(Clone)]
pub struct ExtensionSnapshot {
namespace: String,
ownership: OwnershipScope,
sidecar_requests: SharedSidecarRequestClient,
}
pub struct ExtensionContext<'a> {
snapshot: ExtensionSnapshot,
host: &'a mut dyn ExtensionHost,
}
impl ExtensionSnapshot {
pub(crate) fn new(
namespace: String,
ownership: OwnershipScope,
sidecar_requests: SharedSidecarRequestClient,
) -> Self {
Self {
namespace,
ownership,
sidecar_requests,
}
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn ownership(&self) -> &OwnershipScope {
&self.ownership
}
pub fn ext_event(&self, payload: Vec<u8>) -> EventFrame {
EventFrame::new(
self.ownership.clone(),
EventPayload::Ext(ExtEnvelope {
namespace: self.namespace.clone(),
payload,
}),
)
}
pub fn ext_event_wire(
&self,
payload: Vec<u8>,
) -> Result<crate::wire::EventFrame, SidecarError> {
crate::wire::event_frame_from_compat(self.ext_event(payload)).map_err(wire_protocol_error)
}
pub fn invoke_callback(
&self,
payload: Vec<u8>,
timeout: Duration,
) -> Result<Vec<u8>, SidecarError> {
let response = self.sidecar_requests.invoke(
self.ownership.clone(),
SidecarRequestPayload::Ext(ExtEnvelope {
namespace: self.namespace.clone(),
payload,
}),
timeout,
)?;
extension_callback_response_payload(&self.namespace, response)
}
}
impl<'a> ExtensionContext<'a> {
pub(crate) fn new(snapshot: ExtensionSnapshot, host: &'a mut dyn ExtensionHost) -> Self {
Self { snapshot, host }
}
pub fn snapshot(&self) -> ExtensionSnapshot {
self.snapshot.clone()
}
pub fn namespace(&self) -> &str {
self.snapshot.namespace()
}
pub fn ownership(&self) -> &OwnershipScope {
self.snapshot.ownership()
}
pub fn ext_event(&self, payload: Vec<u8>) -> EventFrame {
self.snapshot.ext_event(payload)
}
pub fn ext_event_wire(
&self,
payload: Vec<u8>,
) -> Result<crate::wire::EventFrame, SidecarError> {
self.snapshot.ext_event_wire(payload)
}
pub fn invoke_callback(
&self,
payload: Vec<u8>,
timeout: Duration,
) -> Result<Vec<u8>, SidecarError> {
self.snapshot.invoke_callback(payload, timeout)
}
pub async fn spawn_process(
&mut self,
request: ExecuteRequest,
) -> Result<ProcessStartedResponse, SidecarError> {
self.host
.spawn_process(self.snapshot.ownership.clone(), request)
.await
}
pub async fn spawn_process_wire(
&mut self,
request: crate::wire::ExecuteRequest,
) -> Result<crate::wire::ProcessStartedResponse, SidecarError> {
let payload = crate::wire::request_payload_to_compat(
self.snapshot.ownership(),
crate::wire::RequestPayload::ExecuteRequest(request),
)
.map_err(wire_protocol_error)?;
let crate::protocol::RequestPayload::Execute(request) = payload else {
return Err(unexpected_wire_request_payload("execute"));
};
let response = self.spawn_process(request).await?;
let payload = crate::wire::response_payload_from_compat(
self.snapshot.ownership(),
crate::protocol::ResponsePayload::ProcessStarted(response),
)
.map_err(wire_protocol_error)?;
let crate::wire::ResponsePayload::ProcessStartedResponse(response) = payload else {
return Err(unexpected_wire_response_payload("process started"));
};
Ok(response)
}
pub async fn write_stdin(
&mut self,
request: WriteStdinRequest,
) -> Result<StdinWrittenResponse, SidecarError> {
self.host
.write_stdin(self.snapshot.ownership.clone(), request)
.await
}
pub async fn write_stdin_wire(
&mut self,
request: crate::wire::WriteStdinRequest,
) -> Result<crate::wire::StdinWrittenResponse, SidecarError> {
let payload = crate::wire::request_payload_to_compat(
self.snapshot.ownership(),
crate::wire::RequestPayload::WriteStdinRequest(request),
)
.map_err(wire_protocol_error)?;
let crate::protocol::RequestPayload::WriteStdin(request) = payload else {
return Err(unexpected_wire_request_payload("write stdin"));
};
let response = self.write_stdin(request).await?;
let payload = crate::wire::response_payload_from_compat(
self.snapshot.ownership(),
crate::protocol::ResponsePayload::StdinWritten(response),
)
.map_err(wire_protocol_error)?;
let crate::wire::ResponsePayload::StdinWrittenResponse(response) = payload else {
return Err(unexpected_wire_response_payload("stdin written"));
};
Ok(response)
}
pub async fn close_stdin(
&mut self,
request: CloseStdinRequest,
) -> Result<StdinClosedResponse, SidecarError> {
self.host
.close_stdin(self.snapshot.ownership.clone(), request)
.await
}
pub async fn close_stdin_wire(
&mut self,
request: crate::wire::CloseStdinRequest,
) -> Result<crate::wire::StdinClosedResponse, SidecarError> {
let payload = crate::wire::request_payload_to_compat(
self.snapshot.ownership(),
crate::wire::RequestPayload::CloseStdinRequest(request),
)
.map_err(wire_protocol_error)?;
let crate::protocol::RequestPayload::CloseStdin(request) = payload else {
return Err(unexpected_wire_request_payload("close stdin"));
};
let response = self.close_stdin(request).await?;
let payload = crate::wire::response_payload_from_compat(
self.snapshot.ownership(),
crate::protocol::ResponsePayload::StdinClosed(response),
)
.map_err(wire_protocol_error)?;
let crate::wire::ResponsePayload::StdinClosedResponse(response) = payload else {
return Err(unexpected_wire_response_payload("stdin closed"));
};
Ok(response)
}
pub async fn kill_process(
&mut self,
request: KillProcessRequest,
) -> Result<ProcessKilledResponse, SidecarError> {
self.host
.kill_process(self.snapshot.ownership.clone(), request)
.await
}
pub async fn kill_process_wire(
&mut self,
request: crate::wire::KillProcessRequest,
) -> Result<crate::wire::ProcessKilledResponse, SidecarError> {
let payload = crate::wire::request_payload_to_compat(
self.snapshot.ownership(),
crate::wire::RequestPayload::KillProcessRequest(request),
)
.map_err(wire_protocol_error)?;
let crate::protocol::RequestPayload::KillProcess(request) = payload else {
return Err(unexpected_wire_request_payload("kill process"));
};
let response = self.kill_process(request).await?;
let payload = crate::wire::response_payload_from_compat(
self.snapshot.ownership(),
crate::protocol::ResponsePayload::ProcessKilled(response),
)
.map_err(wire_protocol_error)?;
let crate::wire::ResponsePayload::ProcessKilledResponse(response) = payload else {
return Err(unexpected_wire_response_payload("process killed"));
};
Ok(response)
}
pub async fn poll_event(
&mut self,
timeout: Duration,
) -> Result<Option<EventFrame>, SidecarError> {
self.host
.poll_event(self.snapshot.ownership.clone(), timeout)
.await
}
pub async fn poll_event_wire(
&mut self,
timeout: Duration,
) -> Result<Option<crate::wire::EventFrame>, SidecarError> {
self.poll_event(timeout)
.await?
.map(crate::wire::event_frame_from_compat)
.transpose()
.map_err(wire_protocol_error)
}
pub async fn guest_filesystem_call(
&mut self,
request: GuestFilesystemCallRequest,
) -> Result<GuestFilesystemResultResponse, SidecarError> {
self.host
.guest_filesystem_call(self.snapshot.ownership.clone(), request)
.await
}
pub async fn guest_filesystem_call_wire(
&mut self,
request: crate::wire::GuestFilesystemCallRequest,
) -> Result<crate::wire::GuestFilesystemResultResponse, SidecarError> {
let payload = crate::wire::request_payload_to_compat(
self.snapshot.ownership(),
crate::wire::RequestPayload::GuestFilesystemCallRequest(request),
)
.map_err(wire_protocol_error)?;
let crate::protocol::RequestPayload::GuestFilesystemCall(request) = payload else {
return Err(unexpected_wire_request_payload("guest filesystem call"));
};
let response = self.guest_filesystem_call(request).await?;
let payload = crate::wire::response_payload_from_compat(
self.snapshot.ownership(),
crate::protocol::ResponsePayload::GuestFilesystemResult(response),
)
.map_err(wire_protocol_error)?;
let crate::wire::ResponsePayload::GuestFilesystemResultResponse(response) = payload else {
return Err(unexpected_wire_response_payload("guest filesystem result"));
};
Ok(response)
}
pub async fn bind_process_to_session(
&mut self,
ext_session_id: impl Into<String>,
process_id: impl Into<String>,
) -> Result<(), SidecarError> {
self.host
.bind_process_to_session(
self.snapshot.ownership.clone(),
self.snapshot.namespace.clone(),
ext_session_id.into(),
process_id.into(),
)
.await
}
pub async fn bind_vm_to_session(
&mut self,
ext_session_id: impl Into<String>,
) -> Result<(), SidecarError> {
self.host
.bind_vm_to_session(
self.snapshot.ownership.clone(),
self.snapshot.namespace.clone(),
ext_session_id.into(),
)
.await
}
pub async fn dispose_session_resources(
&mut self,
ext_session_id: impl Into<String>,
) -> Result<Vec<EventFrame>, SidecarError> {
self.host
.dispose_session_resources(
self.snapshot.ownership.clone(),
self.snapshot.namespace.clone(),
ext_session_id.into(),
)
.await
}
pub async fn dispose_session_resources_wire(
&mut self,
ext_session_id: impl Into<String>,
) -> Result<Vec<crate::wire::EventFrame>, SidecarError> {
self.dispose_session_resources(ext_session_id)
.await?
.into_iter()
.map(crate::wire::event_frame_from_compat)
.collect::<Result<Vec<_>, _>>()
.map_err(wire_protocol_error)
}
pub async fn start_buffering_process_output(
&mut self,
process_id: impl Into<String>,
) -> Result<(), SidecarError> {
self.host
.start_buffering_process_output(self.snapshot.ownership.clone(), process_id.into())
.await
}
pub async fn handoff_buffered_process_output(
&mut self,
ext_session_id: impl Into<String>,
process_id: impl Into<String>,
timeout: Duration,
) -> Result<ExtensionBufferedProcessOutput, SidecarError> {
self.host
.handoff_buffered_process_output(
self.snapshot.ownership.clone(),
self.snapshot.namespace.clone(),
ext_session_id.into(),
process_id.into(),
timeout,
)
.await
}
}
fn wire_protocol_error(error: crate::wire::ProtocolCodecError) -> SidecarError {
SidecarError::InvalidState(format!("invalid generated wire protocol frame: {error}"))
}
fn unexpected_wire_request_payload(operation: &str) -> SidecarError {
SidecarError::InvalidState(format!(
"generated wire {operation} request converted to the wrong compatibility payload"
))
}
fn unexpected_wire_response_payload(operation: &str) -> SidecarError {
SidecarError::InvalidState(format!(
"compatibility {operation} response converted to the wrong generated wire payload"
))
}
fn extension_callback_response_payload(
namespace: &str,
response: SidecarResponsePayload,
) -> Result<Vec<u8>, SidecarError> {
match response {
SidecarResponsePayload::ExtResult(envelope) if envelope.namespace == namespace => {
Ok(envelope.payload)
}
SidecarResponsePayload::ExtResult(envelope) => Err(SidecarError::InvalidState(format!(
"extension callback response namespace {} did not match {}",
envelope.namespace, namespace
))),
SidecarResponsePayload::HostCallbackResult(_)
| SidecarResponsePayload::JsBridgeResult(_) => Err(SidecarError::InvalidState(
String::from("extension callback received a non-extension response"),
)),
}
}
pub enum ExtensionInterruptRequest<'a> {
ExtensionPayload(&'a [u8]),
KillProcess,
}
#[derive(Debug, Clone)]
pub struct ExtensionInterruptResponse {
pub interrupted_response_payload: Vec<u8>,
pub interrupting_response_payload: Option<Vec<u8>>,
}
pub trait Extension: Send + Sync {
fn namespace(&self) -> &str;
fn handle_request<'a>(
&'a self,
ctx: ExtensionContext<'a>,
payload: Vec<u8>,
) -> ExtensionFuture<'a, ExtensionResponse>;
fn on_vm_created<'a>(&'a self, _ctx: ExtensionSnapshot) -> ExtensionFuture<'a, ()> {
Box::pin(async { Ok(()) })
}
fn is_blocking_request(&self, _payload: &[u8]) -> bool {
false
}
fn interrupt_blocking_request(
&self,
_blocking_payload: &[u8],
_interrupt: ExtensionInterruptRequest<'_>,
) -> Option<ExtensionInterruptResponse> {
None
}
fn on_dispose<'a>(&'a self) -> ExtensionFuture<'a, ()> {
Box::pin(async { Ok(()) })
}
}