use anyhow::Result;
use axum::{
Router,
extract::{Query, State},
response::IntoResponse,
routing::get,
};
use chrono::{Local, TimeZone};
use clap::Parser;
use rustpbx::sipflow::{
protocol::{Packet, parse_packet},
storage::{StorageManager, process_packet},
};
use rustpbx::{config::SipFlowSubdirs, sipflow::wav_utils::generate_wav_from_packets_ex};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
#[derive(Parser, Debug)]
#[command(author, version, about = "SipFlow - SIP and RTP flow recording server", long_about = None)]
struct Args {
#[arg(short, long, default_value = "0.0.0.0")]
addr: String,
#[arg(short, long, default_value_t = 3000)]
port: u16,
#[arg(long, default_value_t = 3001)]
http_port: u16,
#[arg(short, long, default_value = "./config/sipflow")]
root: String,
#[arg(long, default_value_t = 1000)]
flush_count: usize,
#[arg(long, default_value_t = 5)]
flush_interval: u64,
#[arg(long, default_value_t = 100000)]
buffer_size: usize,
}
#[derive(Clone)]
struct AppState {
storage: Arc<Mutex<StorageManager>>,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
std::fs::create_dir_all(&args.root)?;
let storage = Arc::new(Mutex::new(StorageManager::new(
std::path::Path::new(&args.root),
args.flush_count,
args.flush_interval,
1024,
SipFlowSubdirs::None,
)));
let app_state = AppState {
storage: storage.clone(),
};
let udp_addr: SocketAddr = format!("{}:{}", args.addr, args.port).parse()?;
let socket = UdpSocket::bind(udp_addr).await?;
tracing::info!("UDP server listening on {}", udp_addr);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Packet>(args.buffer_size);
rustpbx::utils::spawn(async move {
let mut buf = vec![0u8; 65535];
loop {
match socket.recv_from(&mut buf).await {
Ok((size, _)) => {
if let Ok(packet) = parse_packet(&buf[..size]) {
let _ = tx.try_send(packet);
}
}
Err(e) => {
tracing::error!("UDP recv error: {}", e);
}
}
}
});
let storage_worker = storage.clone();
rustpbx::utils::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
tokio::select! {
Some(packet) = rx.recv() => {
let processed = process_packet(packet);
let mut mg = storage_worker.lock().await;
let _ = mg.write_processed(processed).await;
}
_ = interval.tick() => {
let mut mg = storage_worker.lock().await;
let _ = mg.check_flush().await;
}
}
}
});
let app = Router::new()
.route("/health", get(health_handler))
.route("/flow", get(flow_handler))
.route("/media", get(media_handler))
.with_state(app_state);
let http_addr = SocketAddr::from(([0, 0, 0, 0], args.http_port));
tracing::info!("HTTP server listening on {}", http_addr);
let listener = tokio::net::TcpListener::bind(http_addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn health_handler() -> &'static str {
"OK"
}
async fn flow_handler(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> axum::Json<serde_json::Value> {
let callid = params.get("callid").cloned().unwrap_or_default();
let start_ts = params
.get("start")
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| Local::now().timestamp() - 3600);
let end_ts = params
.get("end")
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| Local::now().timestamp() + 3600);
let start_dt = Local.timestamp_opt(start_ts, 0).unwrap();
let end_dt = Local.timestamp_opt(end_ts, 0).unwrap();
let mut mg = state.storage.lock().await;
match mg.query_flow(&callid, start_dt, end_dt).await {
Ok(flow) => axum::Json(serde_json::json!({
"status": "success",
"callid": callid,
"flow": flow
})),
Err(e) => axum::Json(serde_json::json!({
"status": "error",
"message": e.to_string()
})),
}
}
async fn media_handler(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> impl axum::response::IntoResponse {
let callid = params.get("callid").cloned().unwrap_or_default();
let start_ts_param = params
.get("start")
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| Local::now().timestamp() - 3600);
let end_ts_param = params
.get("end")
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| Local::now().timestamp() + 3600);
let force_pcm = params
.get("format")
.map(|s| s.to_lowercase() == "pcm")
.unwrap_or(false);
let stats_only = params
.get("stats")
.map(|s| s == "1" || s.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let start_dt = Local.timestamp_opt(start_ts_param, 0).unwrap();
let end_dt = Local.timestamp_opt(end_ts_param, 0).unwrap();
if stats_only {
let stats = {
let mut mg = state.storage.lock().await;
mg.query_media_stats(&callid, start_dt, end_dt)
.await
.unwrap_or_default()
};
let stats_json: Vec<_> = stats
.into_iter()
.map(|stat| {
serde_json::json!({
"leg": stat.leg,
"src": stat.src,
"count": stat.packet_count,
"packet_count": stat.packet_count,
"lost_packets": stat.lost_packets,
"expected_packets": stat.expected_packets,
"loss_percent": stat.loss_percent,
"jitter_ms": stat.jitter_ms,
"ssrc": stat.ssrc,
"payload_type": stat.payload_type,
"clock_rate": stat.clock_rate,
})
})
.collect();
return axum::Json(serde_json::json!({
"status": "success",
"callid": callid,
"stats": stats_json
}))
.into_response();
}
let packets = {
let mut mg = state.storage.lock().await;
mg.query_media(&callid, start_dt, end_dt)
.await
.unwrap_or_default()
};
match generate_wav_from_packets_ex(&packets, force_pcm) {
Ok(wav_data) => axum::response::Response::builder()
.header("Content-Type", "audio/wav")
.header(
"Content-Disposition",
format!("attachment; filename=\"{}.wav\"", callid),
)
.body(axum::body::Body::from(wav_data))
.unwrap()
.into_response(),
Err(e) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
#[cfg(test)]
mod tests {
use rustpbx::sipflow::wav_utils::generate_wav_from_packets;
#[test]
fn test_generate_wav_pcmu_no_transcode() {
let mut packets = Vec::new(); let payload = vec![0x7F; 160];
let mut header = vec![0u8; 12];
header[0] = 0x80; header[1] = 0;
let mut p1 = header.clone();
p1[4..8].copy_from_slice(&1000u32.to_be_bytes());
p1.extend_from_slice(&payload);
packets.push((0, 1000u64, p1));
let mut p2 = header.clone();
p2[4..8].copy_from_slice(&1000u32.to_be_bytes());
p2.extend_from_slice(&payload);
packets.push((1, 1000u64, p2));
let mut p3 = header.clone();
p3[4..8].copy_from_slice(&1160u32.to_be_bytes());
p3.extend_from_slice(&payload);
packets.push((0, 1160u64, p3));
let result = generate_wav_from_packets(&packets);
assert!(result.is_ok());
let wav_bytes = result.unwrap();
assert_eq!(&wav_bytes[0..4], b"RIFF");
let fmt_tag = u16::from_le_bytes([wav_bytes[20], wav_bytes[21]]);
assert_eq!(fmt_tag, 7); }
#[test]
fn test_generate_wav_mixed_transcode() {
let mut packets = Vec::new();
let mut header_pcmu = vec![0u8; 12];
header_pcmu[0] = 0x80; header_pcmu[1] = 0; let payload_pcmu = vec![0x7F; 160];
let mut p1 = header_pcmu.clone();
p1.extend_from_slice(&payload_pcmu);
packets.push((0, 1000u64, p1));
let mut header_g722 = vec![0u8; 12];
header_g722[0] = 0x80; header_g722[1] = 9; let payload_g722 = vec![0u8; 160];
let mut p2 = header_g722.clone();
p2.extend_from_slice(&payload_g722);
packets.push((1, 1000u64, p2));
let result = generate_wav_from_packets(&packets);
assert!(result.is_ok());
let wav_bytes = result.unwrap();
let fmt_tag = u16::from_le_bytes([wav_bytes[20], wav_bytes[21]]);
assert_eq!(fmt_tag, 1);
let rate = u32::from_le_bytes([wav_bytes[24], wav_bytes[25], wav_bytes[26], wav_bytes[27]]);
assert_eq!(rate, 16000);
}
}