use async_stream::stream;
use futures::Stream;
use plexus_core::plexus::DynamicHub;
use plexus_rpc::transport::TransportServer;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum EchoEvent {
Echo { message: String, count: u32 },
}
#[derive(Clone)]
pub struct Echo;
#[plexus_macros::activation(
namespace = "echo",
version = "1.0.0",
description = "Echo messages back"
)]
impl Echo {
#[plexus_macros::method]
async fn echo(
&self,
message: String,
count: u32,
) -> impl Stream<Item = EchoEvent> + Send + 'static {
stream! {
for i in 0..count {
yield EchoEvent::Echo {
message: message.clone(),
count: i + 1,
};
}
}
}
#[plexus_macros::method]
async fn once(
&self,
message: String,
) -> impl Stream<Item = EchoEvent> + Send + 'static {
stream! {
yield EchoEvent::Echo { message, count: 1 };
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let hub = Arc::new(DynamicHub::new("echo").register(Echo));
let rpc_converter = |arc| {
DynamicHub::arc_into_rpc_module(arc)
.map_err(|e| anyhow::anyhow!("rpc module: {e}"))
};
println!("plexus-rpc echo example");
println!(" ws://127.0.0.1:4444 — synapse will discover this on its default port");
println!(" $ synapse echo echo --message hello --count 3");
println!(" Ctrl-C to stop.");
TransportServer::builder(hub, rpc_converter)
.with_websocket(4444)
.build()
.await?
.serve()
.await
}