restate-sdk-shared-core 0.10.0

SDK Shared core
Documentation
mod async_result;
mod calls;
mod failures;
mod implicit_cancellation;
mod input_output;
mod promise;
mod run;
mod sleep;
mod state;
mod suspensions;

use super::*;

use crate::service_protocol::messages::{
    output_command_message, signal_notification_message, ErrorMessage, InputCommandMessage,
    OutputCommandMessage, RestateMessage, SignalNotificationMessage, StartMessage,
    SuspensionMessage,
};
use crate::service_protocol::{messages, CompletionId, Decoder, Encoder, RawMessage, Version};
use bytes::Bytes;
use googletest::prelude::*;
use test_log::test;

// --- Test infra

impl CoreVM {
    fn mock_init(version: Version) -> CoreVM {
        Self::mock_init_with_options(version, Default::default())
    }

    fn mock_init_with_options(version: Version, options: VMOptions) -> CoreVM {
        let vm = CoreVM::new(
            vec![("content-type".to_owned(), version.to_string())],
            options,
        )
        .unwrap();

        assert_that!(
            vm.get_response_head().headers,
            contains(eq(Header {
                key: Cow::Borrowed("content-type"),
                value: Cow::Borrowed(version.content_type())
            }))
        );

        vm
    }
}

struct VMTestCase {
    encoder: Encoder,
    vm: CoreVM,
}

impl VMTestCase {
    fn new() -> Self {
        Self {
            encoder: Encoder::new(Version::maximum_supported_version()),
            vm: CoreVM::mock_init(Version::maximum_supported_version()),
        }
    }

    fn with_vm_options(options: VMOptions) -> Self {
        Self {
            encoder: Encoder::new(Version::maximum_supported_version()),
            vm: CoreVM::mock_init_with_options(Version::maximum_supported_version(), options),
        }
    }

    fn input<M: RestateMessage>(mut self, m: M) -> Self {
        self.vm.notify_input(self.encoder.encode(&m));
        self
    }

    fn run(mut self, user_code: impl FnOnce(&mut CoreVM)) -> OutputIterator {
        self.vm.notify_input_closed();
        assert!(self.vm.is_ready_to_execute().unwrap());

        user_code(&mut self.vm);

        OutputIterator::collect_vm(&mut self.vm)
    }

    fn run_without_closing_input(
        mut self,
        user_code: impl FnOnce(&mut CoreVM, &Encoder),
    ) -> OutputIterator {
        assert!(self.vm.is_ready_to_execute().unwrap());

        user_code(&mut self.vm, &self.encoder);

        OutputIterator::collect_vm(&mut self.vm)
    }
}

struct OutputIterator(Decoder);

impl OutputIterator {
    fn collect_vm(vm: &mut impl VM) -> Self {
        let mut decoder = Decoder::new(Version::maximum_supported_version());
        while let TakeOutputResult::Buffer(b) = vm.take_output() {
            decoder.push(b);
        }
        assert_eq!(vm.take_output(), TakeOutputResult::EOF);

        Self(decoder)
    }

    fn next_decoded<M: RestateMessage>(&mut self) -> Option<M> {
        self.0
            .consume_next()
            .unwrap()
            .map(|msg| msg.decode_to::<M>(0).unwrap())
    }
}

impl Iterator for OutputIterator {
    type Item = RawMessage;

    fn next(&mut self) -> Option<Self::Item> {
        self.0.consume_next().unwrap()
    }
}

// --- Matchers

/// Matcher for Error
pub fn eq_error(vm_error: Error) -> impl Matcher<ActualT = Error> {
    pat!(Error {
        code: eq(vm_error.code),
        message: eq(vm_error.message),
        stacktrace: eq(vm_error.stacktrace)
    })
}

/// Matcher for ErrorMessage to equal Error
pub fn error_message_as_error(vm_error: Error) -> impl Matcher<ActualT = ErrorMessage> {
    pat!(ErrorMessage {
        code: eq(vm_error.code as u32),
        message: eq(vm_error.message),
        stacktrace: eq(vm_error.stacktrace)
    })
}

pub fn suspended_waiting_completion(
    completion_id: CompletionId,
) -> impl Matcher<ActualT = SuspensionMessage> {
    pat!(SuspensionMessage {
        waiting_completions: eq(vec![completion_id]),
        waiting_signals: eq(vec![1])
    })
}

pub fn suspended_waiting_signal(signal_idx: u32) -> impl Matcher<ActualT = SuspensionMessage> {
    pat!(SuspensionMessage {
        waiting_signals: all!(contains(eq(signal_idx)), contains(eq(1)))
    })
}

pub fn is_suspended() -> impl Matcher<ActualT = Error> {
    predicate(|e: &Error| e.is_suspended_error())
        .with_description("is suspended error", "is not suspended error")
}

pub fn is_closed() -> impl Matcher<ActualT = Error> {
    predicate(|e: &Error| e.code == error::codes::CLOSED.code())
        .with_description("is closed error", "is not closed error")
}

pub fn is_output_with_success(b: impl AsRef<[u8]>) -> impl Matcher<ActualT = OutputCommandMessage> {
    pat!(OutputCommandMessage {
        result: some(pat!(output_command_message::Result::Value(eq(
            Bytes::copy_from_slice(b.as_ref()).into()
        ))))
    })
}

pub fn is_output_with_failure(
    code: u16,
    message: impl Into<String>,
) -> impl Matcher<ActualT = OutputCommandMessage> {
    pat!(OutputCommandMessage {
        result: some(pat!(output_command_message::Result::Failure(eq(
            messages::Failure {
                code: code as u32,
                message: message.into(),
                metadata: vec![],
            }
        ))))
    })
}

// --- Mocks

pub fn start_message(known_entries: u32) -> StartMessage {
    StartMessage {
        id: Bytes::from_static(b"123"),
        debug_id: "123".to_string(),
        known_entries,
        state_map: vec![],
        partial_state: true,
        key: "".to_string(),
        retry_count_since_last_stored_entry: 0,
        duration_since_last_stored_entry: 0,
        random_seed: 0,
    }
}

pub fn input_entry_message(b: impl AsRef<[u8]>) -> InputCommandMessage {
    InputCommandMessage {
        headers: vec![],
        value: Some(Bytes::copy_from_slice(b.as_ref()).into()),
        ..InputCommandMessage::default()
    }
}

pub fn cancel_signal_notification() -> SignalNotificationMessage {
    SignalNotificationMessage {
        signal_id: Some(signal_notification_message::SignalId::Idx(1)),
        result: Some(signal_notification_message::Result::Void(Default::default())),
    }
}

#[test]
fn take_output_on_newly_initialized_vm() {
    let mut vm = CoreVM::mock_init(Version::maximum_supported_version());
    assert_that!(
        vm.take_output(),
        eq(TakeOutputResult::Buffer(Bytes::default()))
    );
}

#[test]
fn instantiate_core_vm_minimum_supported_version() {
    CoreVM::mock_init(Version::minimum_supported_version());
}