use async_trait::async_trait;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::{
Mutex, broadcast,
mpsc::{self, UnboundedReceiver, UnboundedSender, unbounded_channel},
oneshot,
};
use crate::{
channel_client::{ChannelClient, ChannelClientType},
control_client::ControlClient,
jsonrpc::{Request, Response},
message::{
CapabilitiesResult, ChannelCapabilities, ChannelMessage, ChannelState, DrainResult,
InitParams, InitResult, ListKeysResult, MessageInResult, MessageOutParams,
MessageOutResult, NameResult, StateResult, StopResult, WaitUntilDrainedParams,
WaitUntilDrainedResult,
},
plugin_actor::PluginHandle,
plugin_runtime::{HasStore, PluginHandler},
};
pub struct MockChannel {
in_rx: Arc<Mutex<Option<UnboundedReceiver<ChannelMessage>>>>,
in_tx: UnboundedSender<ChannelMessage>,
outgoing: Arc<Mutex<Vec<ChannelMessage>>>,
state: Arc<Mutex<ChannelState>>,
config: DashMap<String, String>,
secrets: DashMap<String, String>,
}
impl Clone for MockChannel {
fn clone(&self) -> Self {
Self {
in_tx: self.in_tx.clone(),
in_rx: Arc::clone(&self.in_rx),
outgoing: Arc::clone(&self.outgoing),
state: self.state.clone(),
config: self.config.clone(),
secrets: self.secrets.clone(),
}
}
}
impl MockChannel {
pub fn new() -> Self {
let (in_tx, in_rx) = unbounded_channel();
Self {
in_tx,
in_rx: Arc::new(Mutex::new(Some(in_rx))),
outgoing: Arc::new(Mutex::new(vec![])),
state: Arc::new(Mutex::new(ChannelState::STOPPED)),
config: DashMap::new(),
secrets: DashMap::new(),
}
}
pub async fn inject(&self, msg: ChannelMessage) {
let _ = self.in_tx.send(msg);
}
pub async fn sent_messages(&self) -> Vec<ChannelMessage> {
self.outgoing.lock().await.iter().cloned().collect()
}
}
#[async_trait]
impl ChannelClientType for Arc<MockChannel> {
async fn send(&mut self, msg: ChannelMessage) -> anyhow::Result<()> {
self.outgoing.lock().await.push(msg);
Ok(())
}
async fn next_inbound(&mut self) -> Option<ChannelMessage> {
let mut guard = self.in_rx.lock().await;
let rx = guard
.as_mut()
.expect("next_inbound already taken by another instance");
rx.recv().await
}
}
impl HasStore for Arc<MockChannel> {
fn config_store(&self) -> &DashMap<String, String> {
&self.config
}
fn secret_store(&self) -> &DashMap<String, String> {
&self.secrets
}
}
#[async_trait]
impl PluginHandler for Arc<MockChannel> {
fn name(&self) -> NameResult {
NameResult {
name: "mock".into(),
}
}
fn capabilities(&self) -> CapabilitiesResult {
CapabilitiesResult {
capabilities: ChannelCapabilities {
name: "mock".to_string(),
version: "1".to_string(),
supports_sending: true,
supports_receiving: true,
supports_text: true,
supports_files: false,
supports_media: false,
supports_events: false,
supports_typing: false,
supports_threading: false,
supports_routing: false,
supports_reactions: false,
supports_call: false,
supports_buttons: false,
supports_links: false,
supports_custom_payloads: false,
channel_data_schema_id: None,
channel_data_schema: None,
supported_events: vec![],
},
}
}
fn list_config_keys(&self) -> ListKeysResult {
ListKeysResult {
required_keys: vec![],
optional_keys: vec![],
dynamic_keys: vec![],
}
}
fn list_secret_keys(&self) -> ListKeysResult {
ListKeysResult {
required_keys: vec![],
optional_keys: vec![],
dynamic_keys: vec![],
}
}
async fn state(&self) -> StateResult {
StateResult {
state: *self.state.lock().await,
}
}
async fn init(&mut self, _params: InitParams) -> InitResult {
*self.state.lock().await = ChannelState::RUNNING;
InitResult {
success: true,
error: None,
}
}
async fn drain(&mut self) -> DrainResult {
*self.state.lock().await = ChannelState::DRAINING;
DrainResult {
success: true,
error: None,
}
}
async fn wait_until_drained(&self, _params: WaitUntilDrainedParams) -> WaitUntilDrainedResult {
WaitUntilDrainedResult {
stopped: true,
error: false,
}
}
async fn stop(&mut self) -> StopResult {
*self.state.lock().await = ChannelState::STOPPED;
StopResult {
success: true,
error: None,
}
}
async fn send_message(&mut self, params: MessageOutParams) -> MessageOutResult {
self.outgoing.lock().await.push(params.message);
MessageOutResult {
success: true,
error: None,
}
}
async fn receive_message(&mut self) -> MessageInResult {
let mut guard = self.in_rx.lock().await;
let rx = guard
.as_mut()
.expect("receive_message already taken by another instance");
match rx.recv().await {
Some(msg) => MessageInResult {
message: msg,
error: false,
},
None => MessageInResult {
message: Default::default(),
error: true,
},
}
}
}
pub async fn spawn_mock_handle() -> (Arc<MockChannel>, PluginHandle) {
use crate::plugin_actor::handle_internal_request;
let (rpc_tx, mut rpc_rx) = mpsc::channel::<(Request, oneshot::Sender<Response>)>(8);
let (msg_tx, _) = broadcast::channel::<ChannelMessage>(8);
let mock = Arc::new(MockChannel::new());
let mut plugin = mock.clone(); let mut plugin2 = mock.clone();
{
tokio::spawn(async move {
while let Some((req, tx_rsp)) = rpc_rx.recv().await {
let resp = handle_internal_request(&mut plugin, req).await;
let _ = tx_rsp.send(resp);
}
});
}
{
let msg_tx_clone = msg_tx.clone();
tokio::spawn(async move {
while let Some(msg) = plugin2.next_inbound().await {
let _ = msg_tx_clone.send(msg);
}
});
}
let channel_client = ChannelClient::new(rpc_tx.clone(), msg_tx.clone());
let control_client = ControlClient::new(rpc_tx.clone());
let handle = PluginHandle::new(channel_client, control_client).await;
(mock, handle)
}
#[cfg(any(test))]
mod tests {
#[cfg(test)]
use super::*;
#[cfg(test)]
use crate::message::{MessageContent, Participant};
#[cfg(test)]
use chrono::Utc;
#[tokio::test]
async fn capabilities_and_metadata() {
let (_mock, handle) = spawn_mock_handle().await;
let name = handle.name();
assert_eq!(name, "mock");
let caps = handle.capabilities();
assert_eq!(caps.name, "mock");
assert!(caps.supports_sending && caps.supports_receiving);
let cfg = handle.list_config_keys();
let secr = handle.list_secret_keys();
assert!(cfg.required_keys.is_empty() && cfg.optional_keys.is_empty());
assert!(secr.required_keys.is_empty() && secr.optional_keys.is_empty());
}
#[tokio::test]
async fn state_lifecycle_and_health() {
let (_mock, handle) = spawn_mock_handle().await;
let mut state = handle.state().await.expect("state call");
assert_eq!(state, ChannelState::STOPPED);
let res = handle.start(InitParams::default()).await.expect("start");
assert!(res.success);
state = handle.state().await.expect("state call");
assert_eq!(state, ChannelState::RUNNING);
handle.drain().await.expect("drain");
handle
.wait_until_drained(250)
.await
.expect("waitUntilDrained");
handle.stop().await.expect("stop");
state = handle.state().await.expect("state call");
assert_eq!(state, ChannelState::STOPPED);
let health = handle.health().await.expect("health");
assert!(health.healthy);
}
#[tokio::test]
async fn send_and_receive_roundtrip() {
let (mut mock, mut handle) = spawn_mock_handle().await;
let out_msg = ChannelMessage {
from: Participant::new("alice".to_string(), None, None),
to: vec![Participant::new("bob".to_string(), None, None)],
content: vec![MessageContent::Text {
text: "ping".into(),
}],
timestamp: Utc::now().to_rfc3339(),
..Default::default()
};
let _ = mock
.send_message(MessageOutParams {
message: out_msg.clone(),
})
.await;
assert_eq!(mock.sent_messages().await, vec![out_msg]);
let in_msg = ChannelMessage {
from: Participant::new("bob".to_string(), None, None),
to: vec![Participant::new("alice".to_string(), None, None)],
content: vec![MessageContent::Text {
text: "pong".into(),
}],
timestamp: Utc::now().to_rfc3339(),
..Default::default()
};
mock.inject(in_msg.clone()).await;
let got = handle.next_message().await.expect("next msg");
assert_eq!(got, in_msg);
}
#[tokio::test]
async fn internal_wait_until_drained_path() {
let (mock, handle) = spawn_mock_handle().await;
handle.wait_until_drained(100).await.expect("drained");
let _ = mock.state;
}
}