use anyhow::Result;
use clap::{Parser, Subcommand};
use fuse_rule::config::{FuseRuleConfig, SourceConfig};
use fuse_rule::ingestion::{KafkaIngestion, WebSocketIngestion};
use fuse_rule::server::FuseRuleServer;
use fuse_rule::RuleEngine;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
#[derive(Parser)]
#[command(name = "fuserule")]
#[command(about = "High-performance Arrow-based Rule Engine", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Run {
#[arg(short, long, default_value = "fuse_rule_config.yaml")]
config: String,
#[arg(short, long, default_value_t = 3030)]
port: u16,
},
Validate {
#[arg(short, long, default_value = "fuse_rule_config.yaml")]
config: String,
#[arg(short, long)]
predicate: Option<String>,
},
Repl {
#[arg(short, long, default_value = "fuse_rule_config.yaml")]
config: String,
},
Debug {
#[arg(short, long, default_value = "fuse_rule_config.yaml")]
config: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Cli::parse();
match args.command {
Commands::Validate { config, predicate } => {
fuse_rule::cli::validate_rule(&config, predicate.as_deref()).await?;
}
Commands::Repl { config } => {
println!("🔥 Starting FuseRule REPL...");
let config_data = FuseRuleConfig::from_file(&config)?;
let engine = RuleEngine::from_config(config_data.clone()).await?;
let shared_engine = Arc::new(RwLock::new(engine));
let schema = shared_engine.read().await.schema();
let mut repl = fuse_rule::repl::Repl::new(shared_engine, schema);
repl.run().await?;
}
Commands::Debug { config } => {
println!("🐛 Starting FuseRule Debugger...");
let config_data = FuseRuleConfig::from_file(&config)?;
let schema = RuleEngine::from_config(config_data.clone())
.await
.map(|e| e.schema())?;
let mut debugger = fuse_rule::debugger::RuleDebugger::new(schema);
debugger.run().await?;
}
Commands::Run { config, port } => {
println!("🔥 Initializing FuseRule Daemon...");
let config_data = FuseRuleConfig::from_file(&config)?;
let engine = RuleEngine::from_config(config_data.clone()).await?;
let rate_limit = config_data.engine.ingest_rate_limit;
let api_keys = config_data.engine.api_keys.clone();
let shared_engine = Arc::new(RwLock::new(engine));
let server = FuseRuleServer::new(
shared_engine.clone(),
config.to_string(),
rate_limit,
api_keys,
);
let mut source_handles = Vec::new();
for source in &config_data.sources {
match source {
SourceConfig::Kafka {
brokers,
topic,
group_id,
auto_commit,
} => {
info!(
"Starting Kafka ingestion: topic={}, group_id={}",
topic, group_id
);
let kafka = KafkaIngestion::new(
shared_engine.clone(),
brokers.clone(),
topic.clone(),
group_id.clone(),
*auto_commit,
);
let handle = tokio::spawn(async move {
if let Err(e) = kafka.run().await {
tracing::error!(error = %e, "Kafka ingestion error");
}
});
source_handles.push(handle);
}
SourceConfig::WebSocket {
bind,
max_connections,
} => {
info!(
"Starting WebSocket ingestion: bind={}, max_connections={}",
bind, max_connections
);
let ws = WebSocketIngestion::new(
shared_engine.clone(),
bind.clone(),
*max_connections,
);
let handle = tokio::spawn(async move {
if let Err(e) = ws.run().await {
tracing::error!(error = %e, "WebSocket ingestion error");
}
});
source_handles.push(handle);
}
}
}
let server_handle = tokio::spawn(async move {
if let Err(e) = server.run(port).await {
tracing::error!(error = %e, "HTTP server error");
}
});
if source_handles.is_empty() {
info!("No ingestion sources configured, waiting for server...");
server_handle.await?;
} else {
tokio::select! {
_ = server_handle => {
info!("Server task completed");
},
_ = async {
for handle in source_handles {
let _ = handle.await;
}
} => {
info!("All ingestion sources completed");
},
}
}
}
}
Ok(())
}