use std::sync::Arc;
use futures::future::BoxFuture;
pub trait ActivityDispatcher: Send + Sync + 'static {
fn dispatch(
&self,
name: &str,
input: &str,
config: &str,
attempt: u32,
) -> Result<String, String>;
fn dispatch_from_process(
&self,
name: &str,
input: &str,
config: &str,
attempt: u32,
caller_pid: Option<u64>,
) -> Result<String, String> {
let _ = caller_pid;
self.dispatch(name, input, config, attempt)
}
fn dispatch_async_from_process(
self: Arc<Self>,
name: String,
input: String,
config: String,
attempt: u32,
caller_pid: Option<u64>,
) -> BoxFuture<'static, Result<String, String>> {
Box::pin(async move {
let blocking = tokio::task::spawn_blocking(move || {
self.dispatch_from_process(&name, &input, &config, attempt, caller_pid)
});
match blocking.await {
Ok(result) => result,
Err(join_error) => Err(format!("activity dispatch task failed: {join_error}")),
}
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::ActivityDispatcher;
use crate::runtime::EngineNifState;
struct Echo;
impl ActivityDispatcher for Echo {
fn dispatch(
&self,
_name: &str,
input: &str,
_config: &str,
_attempt: u32,
) -> Result<String, String> {
Ok(input.to_owned())
}
}
#[test]
fn dispatcher_is_accessible_after_install_on_engine_state() {
let state = EngineNifState::default();
state.set_activity_dispatcher(Arc::new(Echo));
let dispatcher = state.activity_dispatcher();
assert!(dispatcher.is_some());
assert_eq!(
dispatcher
.as_ref()
.and_then(|d| d.dispatch("test", "hello", "{}", 1).ok()),
Some("hello".to_owned())
);
}
}