mod broker;
mod dagger_engine;
mod grpc;
mod grpc_component;
mod http;
mod state;
mod component;
mod services;
use std::net::SocketAddr;
use anyhow::Context;
use broker::Broker;
use clap::{Parser, Subcommand};
use grpc::{GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
use grpc_component::GrpcComponentClient;
use http::HttpServer;
use notmad::Mad;
use state::SharedState;
use tonic::transport::{Channel, ClientTlsConfig};
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
struct Command {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
Serve {
#[arg(
env = "SERVICE_HOST",
long = "service-host",
default_value = "127.0.0.1:3000"
)]
host: SocketAddr,
#[arg(
env = "SERVICE_GRPC_HOST",
long = "service-grpc-host",
default_value = "127.0.0.1:7900"
)]
grpc_host: SocketAddr,
},
Client {
#[arg(
env = "SERVICE_GRPC_HOST",
long = "service-grpc-host",
default_value = "http://127.0.0.1:7900"
)]
grpc_host: String,
#[command(subcommand)]
commands: ClientCommands,
},
}
#[derive(Subcommand)]
enum ClientCommands {
PublishEvent {
#[arg(long)]
topic: String,
#[arg(long)]
value: String,
},
PublishEvents {
#[arg(long)]
topic: String,
#[arg(long)]
size: usize,
#[arg(long)]
threads: usize,
},
GetTopics {},
SubscribeTopic {
#[arg(long)]
topic: String,
},
ComponentPing {
#[arg(long)]
host: String,
},
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt::init();
let cli = Command::parse();
match cli.command.unwrap() {
Commands::Serve { host, grpc_host } => {
tracing::info!("Starting service");
let state = SharedState::new().await?;
Mad::builder()
.add(Broker::new(&state))
.add(HttpServer::new(&state, host))
.add(GrpcServer::new(&state, grpc_host))
.run()
.await?;
}
Commands::Client {
commands,
grpc_host,
} => match commands {
ClientCommands::PublishEvent { topic, value } => {
let mut client = create_client(grpc_host).await?;
let _ = client
.publish_event(PublishEventRequest {
topic,
value: value.into_bytes(),
})
.await?;
}
ClientCommands::PublishEvents {
topic,
size,
threads,
} => {
let mut handles = Vec::new();
for _ in 0..threads {
let topic = topic.clone();
let grpc_host = grpc_host.clone();
let handle = tokio::spawn(async move {
let mut client = create_client(grpc_host).await?;
loop {
let _ = client
.publish_event(PublishEventRequest {
topic: topic.clone(),
value: vec![0; size],
})
.await?;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
handles.push(handle);
}
for handle in handles {
handle.await??;
}
}
ClientCommands::GetTopics {} => {
let mut client = create_client(grpc_host).await?;
println!("Listing topics");
let topics = client.get_topics(GetTopicsRequest {}).await?;
for topic in topics.into_inner().topics {
println!("{topic}");
}
}
ClientCommands::SubscribeTopic { topic } => {
let mut client = create_client(grpc_host).await?;
println!("listening for events in topic: {}", topic);
let resp = client.subscribe(SubscribeRequest { topic }).await?;
let mut stream = resp.into_inner();
while let Some(msg) = stream.message().await? {
println!(
"msg (published={}): {}",
msg.published.unwrap_or_default(),
std::str::from_utf8(&msg.value).unwrap_or_default(),
)
}
}
ClientCommands::ComponentPing { host } => {
tracing::info!(host = host, "creating client");
let client = GrpcComponentClient::new(&host).await?;
tracing::info!(host = host, "sending ping");
client.ping().await?;
}
},
}
Ok(())
}
async fn create_client(
grpc_host: String,
) -> anyhow::Result<
crate::grpc::no_data_service_client::NoDataServiceClient<tonic::transport::Channel>,
> {
let channel = if grpc_host.starts_with("https") {
Channel::from_shared(grpc_host.to_owned())
.context(format!("failed to connect to: {}", &grpc_host))?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await
.context(format!("failed to connect to: {}", &grpc_host))?
} else {
Channel::from_shared(grpc_host.to_owned())
.context(format!("failed to connect to: {}", &grpc_host))?
.connect()
.await
.context(format!("failed to connect to: {}", &grpc_host))?
};
let client = crate::grpc::no_data_service_client::NoDataServiceClient::new(channel);
Ok(client)
}