secure-exec-sidecar 0.3.0

Native Secure Exec sidecar runtime
Documentation
mod support;

use std::collections::HashMap;
use std::fs;
use std::time::Duration;

use secure_exec_bridge::{LoadFilesystemStateRequest, PersistenceBridge};
use secure_exec_sidecar::wire::{
    EventPayload, ExecuteRequest, ExtEnvelope, GuestFilesystemCallRequest,
    GuestFilesystemOperation, GuestRuntimeKind, RequestPayload, ResponsePayload,
    SidecarRequestPayload, SidecarResponseFrame, SidecarResponsePayload, StreamChannel,
    VmLifecycleState,
};
use secure_exec_sidecar::{
    Extension, ExtensionContext, ExtensionFuture, ExtensionResponse, SidecarError,
};
use support::{
    assert_node_available, authenticate_wire, create_vm_wire, new_sidecar, open_session_wire,
    temp_dir, wire_request, wire_vm, RecordingBridge,
};

const TEST_NAMESPACE: &str = "dev.rivet.secure-exec.extension-test";

struct EchoExtension;
struct VmLifetimeExtension;

impl Extension for EchoExtension {
    fn namespace(&self) -> &str {
        TEST_NAMESPACE
    }

    fn handle_request<'a>(
        &'a self,
        mut ctx: ExtensionContext<'a>,
        payload: Vec<u8>,
    ) -> ExtensionFuture<'a, ExtensionResponse> {
        Box::pin(async move {
            let callback =
                ctx.invoke_callback(b"callback-input".to_vec(), Duration::from_secs(1))?;
            let payload = String::from_utf8(payload).map_err(|error| {
                SidecarError::InvalidState(format!("invalid extension test entrypoint: {error}"))
            })?;
            let mut payload_lines = payload.lines();
            let entrypoint = payload_lines
                .next()
                .ok_or_else(|| {
                    SidecarError::InvalidState(String::from("missing extension process entrypoint"))
                })?
                .to_string();
            let lifecycle_entrypoint = payload_lines
                .next()
                .ok_or_else(|| {
                    SidecarError::InvalidState(String::from(
                        "missing extension lifecycle entrypoint",
                    ))
                })?
                .to_string();
            let process_id = "extension-process";
            ctx.start_buffering_process_output(process_id).await?;
            ctx.guest_filesystem_call_wire(GuestFilesystemCallRequest {
                operation: GuestFilesystemOperation::WriteFile,
                path: String::from("/tmp/extension-fs.txt"),
                destination_path: None,
                target: None,
                content: Some(String::from("extension fs primitive")),
                encoding: None,
                recursive: false,
                mode: None,
                uid: None,
                gid: None,
                atime_ms: None,
                mtime_ms: None,
                len: None,
                offset: None,
            })
            .await?;
            let fs_read = ctx
                .guest_filesystem_call_wire(GuestFilesystemCallRequest {
                    operation: GuestFilesystemOperation::ReadFile,
                    path: String::from("/tmp/extension-fs.txt"),
                    destination_path: None,
                    target: None,
                    content: None,
                    encoding: None,
                    recursive: false,
                    mode: None,
                    uid: None,
                    gid: None,
                    atime_ms: None,
                    mtime_ms: None,
                    len: None,
                    offset: None,
                })
                .await?;
            assert_eq!(fs_read.content.as_deref(), Some("extension fs primitive"));

            let started = ctx
                .spawn_process_wire(ExecuteRequest {
                    process_id: process_id.to_string(),
                    command: None,
                    runtime: Some(GuestRuntimeKind::JavaScript),
                    entrypoint: Some(entrypoint),
                    args: Vec::new(),
                    env: HashMap::new(),
                    cwd: None,
                    wasm_permission_tier: None,
                })
                .await?;
            assert_eq!(started.process_id, process_id);
            let handoff = ctx
                .handoff_buffered_process_output(
                    "extension-buffered-session",
                    process_id,
                    Duration::from_secs(5),
                )
                .await?;
            assert!(String::from_utf8_lossy(&handoff.stdout).contains("extension-buffered-output"));
            assert!(!handoff.stdout_truncated);
            let lifecycle_process_id = "extension-lifecycle-process";
            let lifecycle_started = ctx
                .spawn_process_wire(ExecuteRequest {
                    process_id: lifecycle_process_id.to_string(),
                    command: None,
                    runtime: Some(GuestRuntimeKind::JavaScript),
                    entrypoint: Some(lifecycle_entrypoint),
                    args: Vec::new(),
                    env: HashMap::new(),
                    cwd: None,
                    wasm_permission_tier: None,
                })
                .await?;
            assert_eq!(lifecycle_started.process_id, lifecycle_process_id);
            ctx.bind_process_to_session("extension-lifecycle-session", lifecycle_process_id)
                .await?;
            ctx.dispose_session_resources("extension-lifecycle-session")
                .await?;

            let mut stdout = handoff.stdout;
            let mut exit_code = None;
            let mut lifecycle_exit_code = None;
            while exit_code.is_none() || lifecycle_exit_code.is_none() {
                let event = ctx
                    .poll_event_wire(Duration::from_secs(5))
                    .await?
                    .ok_or_else(|| {
                        SidecarError::InvalidState(String::from(
                            "timed out waiting for extension process event",
                        ))
                    })?;
                match event.payload {
                    EventPayload::ProcessOutputEvent(output)
                        if output.process_id == process_id
                            && output.channel == StreamChannel::Stdout =>
                    {
                        stdout.extend(output.chunk);
                    }
                    EventPayload::ProcessExitedEvent(exited) if exited.process_id == process_id => {
                        exit_code = Some(exited.exit_code);
                    }
                    EventPayload::ProcessExitedEvent(exited)
                        if exited.process_id == lifecycle_process_id =>
                    {
                        lifecycle_exit_code = Some(exited.exit_code);
                    }
                    EventPayload::ProcessOutputEvent(_)
                    | EventPayload::ProcessExitedEvent(_)
                    | EventPayload::VmLifecycleEvent(_)
                    | EventPayload::StructuredEvent(_)
                    | EventPayload::ExtEnvelope(_) => {}
                }
            }

            let stdout = String::from_utf8(stdout).map_err(|error| {
                SidecarError::InvalidState(format!("invalid extension process stdout: {error}"))
            })?;
            let process_summary = format!(
                "{}:{}:{}",
                String::from_utf8_lossy(&callback),
                stdout.trim().replace('\n', "|"),
                exit_code.expect("exit code set before loop exits"),
            );
            ExtensionResponse::with_wire_events(
                process_summary.clone().into_bytes(),
                vec![ctx.ext_event_wire(format!("extension-event:{process_summary}").into_bytes())?],
            )
        })
    }
}

impl Extension for VmLifetimeExtension {
    fn namespace(&self) -> &str {
        "dev.rivet.secure-exec.extension-vm-lifetime-test"
    }

    fn handle_request<'a>(
        &'a self,
        mut ctx: ExtensionContext<'a>,
        _payload: Vec<u8>,
    ) -> ExtensionFuture<'a, ExtensionResponse> {
        Box::pin(async move {
            ctx.bind_vm_to_session("extension-vm-session").await?;
            let events = ctx
                .dispose_session_resources_wire("extension-vm-session")
                .await?;
            ExtensionResponse::with_wire_events(b"vm-disposed".to_vec(), events)
        })
    }
}

#[test]
fn registered_extension_round_trips_ext_request_callback_and_event() {
    assert_node_available();
    let mut sidecar = new_sidecar("extension-roundtrip");
    sidecar
        .register_extension(Box::new(EchoExtension))
        .expect("register extension");
    sidecar.set_wire_sidecar_request_handler(|frame| match frame.payload {
        SidecarRequestPayload::ExtEnvelope(envelope) => {
            assert_eq!(envelope.namespace, TEST_NAMESPACE);
            assert_eq!(envelope.payload, b"callback-input");
            Ok(SidecarResponseFrame {
                schema: frame.schema,
                request_id: frame.request_id,
                ownership: frame.ownership,
                payload: SidecarResponsePayload::ExtEnvelope(ExtEnvelope {
                    namespace: envelope.namespace,
                    payload: b"callback-output".to_vec(),
                }),
            })
        }
        other => panic!("unexpected sidecar request payload: {other:?}"),
    });

    let connection_id = authenticate_wire(&mut sidecar, "extension-client");
    let session_id = open_session_wire(&mut sidecar, 2, &connection_id);
    let cwd = temp_dir("extension-process-cwd");
    let entrypoint = cwd.join("extension-entrypoint.mjs");
    let lifecycle_entrypoint = cwd.join("extension-lifecycle-entrypoint.mjs");
    fs::write(
        &entrypoint,
        "console.log('extension-buffered-output');\nsetTimeout(() => {\n  console.log('extension-process-output');\n  process.exit(0);\n}, 50);\n",
    )
    .expect("write extension entrypoint");
    fs::write(&lifecycle_entrypoint, "setInterval(() => {}, 1000);\n")
        .expect("write extension lifecycle entrypoint");
    let (vm_id, _) = create_vm_wire(
        &mut sidecar,
        3,
        &connection_id,
        &session_id,
        GuestRuntimeKind::JavaScript,
        &cwd,
    );
    let result = sidecar
        .dispatch_wire_blocking(wire_request(
            4,
            wire_vm(&connection_id, &session_id, &vm_id),
            RequestPayload::ExtEnvelope(ExtEnvelope {
                namespace: TEST_NAMESPACE.to_string(),
                payload: format!(
                    "{}\n{}",
                    entrypoint.to_string_lossy(),
                    lifecycle_entrypoint.to_string_lossy()
                )
                .into_bytes(),
            }),
        ))
        .expect("dispatch extension request");

    match result.response.payload {
        ResponsePayload::ExtEnvelope(envelope) => {
            assert_eq!(envelope.namespace, TEST_NAMESPACE);
            assert_eq!(
                envelope.payload,
                b"callback-output:extension-buffered-output|extension-process-output:0"
            );
        }
        other => panic!("unexpected extension response: {other:?}"),
    }

    assert_eq!(result.events.len(), 1);
    match &result.events[0].payload {
        EventPayload::ExtEnvelope(envelope) => {
            assert_eq!(envelope.namespace, TEST_NAMESPACE);
            assert_eq!(
                envelope.payload,
                b"extension-event:callback-output:extension-buffered-output|extension-process-output:0",
            );
        }
        other => panic!("unexpected extension event: {other:?}"),
    }
}

#[test]
fn extension_session_resources_can_dispose_bound_vm() {
    assert_node_available();
    let mut sidecar = new_sidecar("extension-vm-lifetime");
    sidecar
        .register_extension(Box::new(VmLifetimeExtension))
        .expect("register vm lifetime extension");

    let connection_id = authenticate_wire(&mut sidecar, "extension-vm-client");
    let session_id = open_session_wire(&mut sidecar, 2, &connection_id);
    let cwd = temp_dir("extension-vm-lifetime-cwd");
    let (vm_id, _) = create_vm_wire(
        &mut sidecar,
        3,
        &connection_id,
        &session_id,
        GuestRuntimeKind::JavaScript,
        &cwd,
    );

    let result = sidecar
        .dispatch_wire_blocking(wire_request(
            4,
            wire_vm(&connection_id, &session_id, &vm_id),
            RequestPayload::ExtEnvelope(ExtEnvelope {
                namespace: String::from("dev.rivet.secure-exec.extension-vm-lifetime-test"),
                payload: Vec::new(),
            }),
        ))
        .expect("dispatch vm lifetime extension request");

    match result.response.payload {
        ResponsePayload::ExtEnvelope(envelope) => {
            assert_eq!(
                envelope.namespace,
                "dev.rivet.secure-exec.extension-vm-lifetime-test"
            );
            assert_eq!(envelope.payload, b"vm-disposed");
        }
        other => panic!("unexpected extension response: {other:?}"),
    }
    assert!(result.events.iter().any(|event| {
        matches!(&event.payload, EventPayload::VmLifecycleEvent(event) if event.state == VmLifecycleState::Disposed)
    }));

    let rejected = sidecar
        .dispatch_wire_blocking(wire_request(
            5,
            wire_vm(&connection_id, &session_id, &vm_id),
            RequestPayload::GuestFilesystemCallRequest(GuestFilesystemCallRequest {
                operation: GuestFilesystemOperation::Exists,
                path: String::from("/tmp/extension-fs.txt"),
                destination_path: None,
                target: None,
                content: None,
                encoding: None,
                recursive: false,
                mode: None,
                uid: None,
                gid: None,
                atime_ms: None,
                mtime_ms: None,
                len: None,
                offset: None,
            }),
        ))
        .expect("dispatch call against disposed vm");
    match rejected.response.payload {
        ResponsePayload::RejectedResponse(rejected) => {
            assert_eq!(rejected.code, "invalid_state");
            assert!(rejected.message.contains(&vm_id));
        }
        other => panic!("unexpected disposed-vm response: {other:?}"),
    }

    sidecar
        .with_bridge_mut(|bridge: &mut RecordingBridge| {
            let snapshot = bridge
                .load_filesystem_state(LoadFilesystemStateRequest {
                    vm_id: vm_id.clone(),
                })
                .expect("load persisted snapshot");
            assert!(
                snapshot.is_some(),
                "extension-bound vm disposal should flush a filesystem snapshot"
            );
        })
        .expect("inspect persistence bridge");
}

#[test]
fn duplicate_extension_namespaces_are_rejected() {
    let mut sidecar = new_sidecar("extension-duplicate");
    sidecar
        .register_extension(Box::new(EchoExtension))
        .expect("register first extension");

    let error = sidecar
        .register_extension(Box::new(EchoExtension))
        .expect_err("duplicate extension namespace should fail");
    assert!(matches!(error, SidecarError::Conflict(_)));
}