fission-shell 0.2.0

Shared shell runtime contracts for hosting Fission applications
Documentation
use fission_core::{
    BoxFuture, CapabilityCtx, CapabilityType, JobCtx, JobRef, JobSpec, OperationCapability,
    ServiceCtx, ServiceRunner, ServiceSlot, ServiceSpec, ServiceType,
};
use fission_shell::async_host::{AsyncMessage, AsyncRegistry, ServiceControlMessage};
use serde::{Deserialize, Serialize};
use std::sync::{mpsc, Arc};
use std::time::Duration;

#[derive(Debug)]
struct EchoJob;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct EchoRequest {
    value: String,
}

impl JobSpec for EchoJob {
    type Request = EchoRequest;
    type Ok = String;
    type Err = String;
    const NAME: &'static str = "echo-job";
}

const ECHO_JOB: JobRef<EchoJob> = JobRef::new("echo-job");

#[derive(Debug)]
struct SyncService;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct SyncConfig {
    prefix: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum SyncCommand {
    Ping,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct SyncCommandOk;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct SyncCommandErr {
    message: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum SyncEvent {
    Connected,
    Pong,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct SyncStartErr {
    message: String,
}

impl ServiceSpec for SyncService {
    type Config = SyncConfig;
    type Command = SyncCommand;
    type CommandOk = SyncCommandOk;
    type CommandErr = SyncCommandErr;
    type Event = SyncEvent;
    type StartErr = SyncStartErr;
    const NAME: &'static str = "sync-service";
}

const SYNC_TYPE: ServiceType<SyncService> = ServiceType::new("sync-service");

struct SyncRunner;

impl ServiceRunner<SyncService> for SyncRunner {
    fn on_command(
        &mut self,
        command: SyncCommand,
        ctx: ServiceCtx<SyncService>,
    ) -> BoxFuture<Result<SyncCommandOk, SyncCommandErr>> {
        Box::pin(async move {
            if matches!(command, SyncCommand::Ping) {
                let _ = ctx.emit(SyncEvent::Pong).await;
            }
            Ok(SyncCommandOk)
        })
    }

    fn on_stop(self: Box<Self>, _ctx: ServiceCtx<SyncService>) -> BoxFuture<()> {
        Box::pin(async {})
    }
}

#[derive(Debug)]
struct StreamingService;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct StreamingConfig;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum StreamingCommand {
    StopPings,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct StreamingCommandOk;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct StreamingCommandErr {
    message: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum StreamingEvent {
    DelayedReady,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct StreamingStartErr {
    message: String,
}

impl ServiceSpec for StreamingService {
    type Config = StreamingConfig;
    type Command = StreamingCommand;
    type CommandOk = StreamingCommandOk;
    type CommandErr = StreamingCommandErr;
    type Event = StreamingEvent;
    type StartErr = StreamingStartErr;
    const NAME: &'static str = "streaming-service";
}

const STREAMING_TYPE: ServiceType<StreamingService> = ServiceType::new("streaming-service");

#[derive(Debug)]
struct UploadEchoCapability;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct UploadEchoRequest {
    path: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct UploadEchoOk;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct UploadEchoErr {
    message: String,
}

impl OperationCapability for UploadEchoCapability {
    type Request = UploadEchoRequest;
    type Ok = UploadEchoOk;
    type Err = UploadEchoErr;
}

const UPLOAD_ECHO_CAPABILITY: CapabilityType<UploadEchoCapability> =
    CapabilityType::new("upload-echo");

struct StreamingRunner {
    ready_task: Option<tokio::task::JoinHandle<()>>,
}

impl ServiceRunner<StreamingService> for StreamingRunner {
    fn on_command(
        &mut self,
        command: StreamingCommand,
        _ctx: ServiceCtx<StreamingService>,
    ) -> BoxFuture<Result<StreamingCommandOk, StreamingCommandErr>> {
        if matches!(command, StreamingCommand::StopPings) {
            if let Some(task) = self.ready_task.take() {
                task.abort();
            }
        }
        Box::pin(async { Ok(StreamingCommandOk) })
    }

    fn on_stop(mut self: Box<Self>, _ctx: ServiceCtx<StreamingService>) -> BoxFuture<()> {
        if let Some(task) = self.ready_task.take() {
            task.abort();
        }
        Box::pin(async {})
    }
}

#[test]
fn registered_jobs_emit_typed_results() {
    let mut registry = AsyncRegistry::new();
    registry.register_job(ECHO_JOB, |request: EchoRequest, _ctx: JobCtx| async move {
        Ok::<_, String>(format!("echo:{}", request.value))
    });

    let (tx, rx) = mpsc::channel();
    let spawned = registry.spawn_job(
        ECHO_JOB.name,
        7,
        serde_json::to_vec(&EchoRequest {
            value: "hello".into(),
        })
        .unwrap(),
        None,
        None,
        None,
        &tx,
        Arc::new(|| {}),
    );

    assert!(spawned);
    let message = rx.recv_timeout(Duration::from_secs(1)).expect("job result");
    match message {
        AsyncMessage::JobOk {
            req_id, payload, ..
        } => {
            assert_eq!(req_id, 7);
            let ok: String = serde_json::from_slice(&payload).unwrap();
            assert_eq!(ok, "echo:hello");
        }
        other => panic!("unexpected message: {:?}", other),
    }
}

#[test]
fn registered_services_start_accept_commands_and_stop() {
    let mut registry = AsyncRegistry::new();
    registry.register_service(
        SYNC_TYPE,
        |config: SyncConfig, ctx: ServiceCtx<SyncService>| async move {
            let _ = ctx.emit(SyncEvent::Connected).await;
            let _ = config.prefix;
            Ok::<_, SyncStartErr>(Box::new(SyncRunner) as Box<dyn ServiceRunner<SyncService>>)
        },
    );

    let (tx, rx) = mpsc::channel();
    let handle = registry
        .spawn_service(
            SYNC_TYPE.name,
            ServiceSlot::singleton(SYNC_TYPE).slot_key(),
            3,
            serde_json::to_vec(&SyncConfig {
                prefix: "demo".into(),
            })
            .unwrap(),
            None,
            &tx,
            Arc::new(|| {}),
        )
        .expect("service handle");

    let first = rx
        .recv_timeout(Duration::from_secs(1))
        .expect("started or event");
    let second = rx
        .recv_timeout(Duration::from_secs(1))
        .expect("started or event");
    assert!(
        matches!(first, AsyncMessage::ServiceStarted { .. })
            || matches!(second, AsyncMessage::ServiceStarted { .. })
    );
    assert!(
        matches!(first, AsyncMessage::ServiceEvent { .. })
            || matches!(second, AsyncMessage::ServiceEvent { .. })
    );

    handle
        .control_tx
        .send(ServiceControlMessage::Command {
            req_id: 9,
            payload: serde_json::to_vec(&SyncCommand::Ping).unwrap(),
            on_ok: None,
            on_err: None,
        })
        .unwrap();

    let third = rx
        .recv_timeout(Duration::from_secs(1))
        .expect("command event");
    let fourth = rx.recv_timeout(Duration::from_secs(1)).expect("command ok");
    assert!(
        matches!(third, AsyncMessage::ServiceEvent { .. })
            || matches!(fourth, AsyncMessage::ServiceEvent { .. })
    );
    assert!(
        matches!(third, AsyncMessage::ServiceCommandOk { req_id: 9, .. })
            || matches!(fourth, AsyncMessage::ServiceCommandOk { req_id: 9, .. })
    );

    handle.control_tx.send(ServiceControlMessage::Stop).unwrap();
    let stopped = rx.recv_timeout(Duration::from_secs(1)).expect("stopped");
    assert!(matches!(
        stopped,
        AsyncMessage::ServiceStopped { instance_id: 3, .. }
    ));
}

#[test]
fn registered_services_can_emit_from_spawned_background_tasks() {
    let mut registry = AsyncRegistry::new();
    registry.register_service(
        STREAMING_TYPE,
        |_config: StreamingConfig, ctx: ServiceCtx<StreamingService>| async move {
            let ready_ctx = ctx.clone();
            let ready_task = tokio::spawn(async move {
                tokio::time::sleep(Duration::from_millis(25)).await;
                let _ = ready_ctx.emit(StreamingEvent::DelayedReady).await;
            });
            Ok::<_, StreamingStartErr>(Box::new(StreamingRunner {
                ready_task: Some(ready_task),
            }) as Box<dyn ServiceRunner<StreamingService>>)
        },
    );

    let (tx, rx) = mpsc::channel();
    let handle = registry
        .spawn_service(
            STREAMING_TYPE.name,
            ServiceSlot::singleton(STREAMING_TYPE).slot_key(),
            11,
            serde_json::to_vec(&StreamingConfig).unwrap(),
            None,
            &tx,
            Arc::new(|| {}),
        )
        .expect("service handle");

    let first = rx
        .recv_timeout(Duration::from_secs(1))
        .expect("service start");
    assert!(matches!(first, AsyncMessage::ServiceStarted { .. }));

    let second = rx
        .recv_timeout(Duration::from_secs(1))
        .expect("delayed service event");
    match second {
        AsyncMessage::ServiceEvent { payload, .. } => {
            let event: StreamingEvent = serde_json::from_slice(&payload).unwrap();
            assert_eq!(event, StreamingEvent::DelayedReady);
        }
        other => panic!("unexpected message: {:?}", other),
    }

    handle.control_tx.send(ServiceControlMessage::Stop).unwrap();
    let stopped = rx.recv_timeout(Duration::from_secs(1)).expect("stopped");
    assert!(matches!(stopped, AsyncMessage::ServiceStopped { .. }));
}

#[test]
fn registered_operation_capabilities_emit_typed_results() {
    let mut registry = AsyncRegistry::new();
    registry.register_operation_capability(
        UPLOAD_ECHO_CAPABILITY,
        |_request: UploadEchoRequest, _ctx: CapabilityCtx| async move {
            Ok::<_, UploadEchoErr>(UploadEchoOk)
        },
    );

    let (tx, rx) = mpsc::channel();
    let spawned = registry.spawn_capability(
        UPLOAD_ECHO_CAPABILITY.name,
        13,
        serde_json::to_vec(&UploadEchoRequest {
            path: "/tmp/asset.bin".into(),
        })
        .unwrap(),
        None,
        None,
        None,
        &tx,
        Arc::new(|| {}),
    );

    assert!(spawned);
    let message = rx
        .recv_timeout(Duration::from_secs(1))
        .expect("capability result");
    match message {
        AsyncMessage::CapabilityOk {
            req_id,
            capability_name,
            ..
        } => {
            assert_eq!(req_id, 13);
            assert_eq!(capability_name, UPLOAD_ECHO_CAPABILITY.name);
        }
        other => panic!("unexpected message: {:?}", other),
    }
}