nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
mod broker;
mod dagger_engine;
mod grpc;
mod grpc_component;
mod http;
mod state;

mod component;
mod services;

use std::net::SocketAddr;

use anyhow::Context;
use broker::Broker;
use clap::{Parser, Subcommand};
use grpc::{GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
use grpc_component::GrpcComponentClient;
use http::HttpServer;
use notmad::Mad;
use state::SharedState;
use tonic::transport::{Channel, ClientTlsConfig};

#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
struct Command {
    #[command(subcommand)]
    command: Option<Commands>,
}

#[derive(Subcommand)]
enum Commands {
    Serve {
        #[arg(
            env = "SERVICE_HOST",
            long = "service-host",
            default_value = "127.0.0.1:3000"
        )]
        host: SocketAddr,
        #[arg(
            env = "SERVICE_GRPC_HOST",
            long = "service-grpc-host",
            default_value = "127.0.0.1:7900"
        )]
        grpc_host: SocketAddr,
    },

    Client {
        // #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
        //host: String,
        #[arg(
            env = "SERVICE_GRPC_HOST",
            long = "service-grpc-host",
            default_value = "http://127.0.0.1:7900"
        )]
        grpc_host: String,

        #[command(subcommand)]
        commands: ClientCommands,
    },
}

#[derive(Subcommand)]
enum ClientCommands {
    PublishEvent {
        #[arg(long)]
        topic: String,
        #[arg(long)]
        value: String,
    },
    PublishEvents {
        #[arg(long)]
        topic: String,
        #[arg(long)]
        size: usize,
        #[arg(long)]
        threads: usize,
    },
    GetTopics {},
    SubscribeTopic {
        #[arg(long)]
        topic: String,
    },
    ComponentPing {
        #[arg(long)]
        host: String,
    },
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenv::dotenv().ok();
    tracing_subscriber::fmt::init();

    let cli = Command::parse();
    match cli.command.unwrap() {
        Commands::Serve { host, grpc_host } => {
            tracing::info!("Starting service");
            let state = SharedState::new().await?;
            Mad::builder()
                .add(Broker::new(&state))
                .add(HttpServer::new(&state, host))
                .add(GrpcServer::new(&state, grpc_host))
                .run()
                .await?;
        }
        Commands::Client {
            commands,
            grpc_host,
        } => match commands {
            ClientCommands::PublishEvent { topic, value } => {
                let mut client = create_client(grpc_host).await?;

                let _ = client
                    .publish_event(PublishEventRequest {
                        topic,
                        value: value.into_bytes(),
                    })
                    .await?;
            }
            ClientCommands::PublishEvents {
                topic,
                size,
                threads,
            } => {
                let mut handles = Vec::new();
                for _ in 0..threads {
                    let topic = topic.clone();
                    let grpc_host = grpc_host.clone();
                    let handle = tokio::spawn(async move {
                        let mut client = create_client(grpc_host).await?;

                        loop {
                            let _ = client
                                .publish_event(PublishEventRequest {
                                    topic: topic.clone(),
                                    value: vec![0; size],
                                })
                                .await?;
                            //tokio::time::sleep(std::time::Duration::from_millis(5)).await;
                        }

                        #[allow(unreachable_code)]
                        Ok::<(), anyhow::Error>(())
                    });

                    handles.push(handle);
                }

                for handle in handles {
                    handle.await??;
                }
            }
            ClientCommands::GetTopics {} => {
                let mut client = create_client(grpc_host).await?;

                println!("Listing topics");
                let topics = client.get_topics(GetTopicsRequest {}).await?;

                for topic in topics.into_inner().topics {
                    println!("{topic}");
                }
            }
            ClientCommands::SubscribeTopic { topic } => {
                let mut client = create_client(grpc_host).await?;

                println!("listening for events in topic: {}", topic);

                let resp = client.subscribe(SubscribeRequest { topic }).await?;

                let mut stream = resp.into_inner();
                while let Some(msg) = stream.message().await? {
                    println!(
                        "msg (published={}): {}",
                        msg.published.unwrap_or_default(),
                        std::str::from_utf8(&msg.value).unwrap_or_default(),
                    )
                }
            }
            ClientCommands::ComponentPing { host } => {
                tracing::info!(host = host, "creating client");
                let client = GrpcComponentClient::new(&host).await?;

                tracing::info!(host = host, "sending ping");
                client.ping().await?;
            }
        },
    }

    Ok(())
}

async fn create_client(
    grpc_host: String,
) -> anyhow::Result<
    crate::grpc::no_data_service_client::NoDataServiceClient<tonic::transport::Channel>,
> {
    let channel = if grpc_host.starts_with("https") {
        Channel::from_shared(grpc_host.to_owned())
            .context(format!("failed to connect to: {}", &grpc_host))?
            .tls_config(ClientTlsConfig::new().with_native_roots())?
            .connect()
            .await
            .context(format!("failed to connect to: {}", &grpc_host))?
    } else {
        Channel::from_shared(grpc_host.to_owned())
            .context(format!("failed to connect to: {}", &grpc_host))?
            .connect()
            .await
            .context(format!("failed to connect to: {}", &grpc_host))?
    };

    let client = crate::grpc::no_data_service_client::NoDataServiceClient::new(channel);

    Ok(client)
}