pub mod local;
pub mod remote;
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Local};
use crate::config::{SipFlowClusterNode, SipFlowConfig, SipFlowEngine};
use crate::sipflow::flowdb_backend::FlowDbBackend;
use crate::sipflow::{SipFlowItem, SipFlowMediaStats};
#[async_trait]
pub trait SipFlowBackend: Send + Sync {
fn record(&self, call_id: &str, item: SipFlowItem) -> Result<()>;
async fn flush(&self) -> Result<()> {
Ok(())
}
async fn query_flow(
&self,
call_id: &str,
start_time: DateTime<Local>,
end_time: DateTime<Local>,
) -> Result<Vec<SipFlowItem>>;
async fn query_media_stats(
&self,
call_id: &str,
start_time: DateTime<Local>,
end_time: DateTime<Local>,
) -> Result<Vec<SipFlowMediaStats>>;
async fn query_media(
&self,
call_id: &str,
start_time: DateTime<Local>,
end_time: DateTime<Local>,
) -> Result<Vec<u8>>;
async fn query_media_stream(
&self,
call_id: &str,
start_time: DateTime<Local>,
end_time: DateTime<Local>,
stream_leg: Option<i32>,
) -> Result<Vec<u8>> {
let _ = stream_leg;
self.query_media(call_id, start_time, end_time).await
}
async fn generate_wav_file(
&self,
call_id: &str,
start_time: DateTime<Local>,
end_time: DateTime<Local>,
stream_leg: Option<i32>,
) -> Result<tempfile::NamedTempFile> {
let data = self
.query_media_stream(call_id, start_time, end_time, stream_leg)
.await?;
let mut file = tempfile::NamedTempFile::new()?;
std::io::Write::write_all(&mut file, &data)?;
std::io::Write::flush(&mut file)?;
Ok(file)
}
}
pub fn create_backend(config: &SipFlowConfig) -> Result<Box<dyn SipFlowBackend>> {
match config {
SipFlowConfig::Local {
root,
subdirs,
flush_count,
flush_interval_secs,
id_cache_size,
engine,
ttl_secs,
memtable_size_mb,
block_cache_capacity_mb,
..
} => {
if *engine == SipFlowEngine::FlowDb {
FlowDbBackend::new(
root.clone(),
*ttl_secs,
*memtable_size_mb,
*block_cache_capacity_mb,
*flush_count,
*flush_interval_secs,
)
.map(|b| Box::new(b) as Box<dyn SipFlowBackend>)
} else {
local::LocalBackend::new(
root.clone(),
subdirs.clone(),
*flush_count,
*flush_interval_secs,
*id_cache_size,
)
.map(|b| Box::new(b) as Box<dyn SipFlowBackend>)
}
}
SipFlowConfig::Remote {
nodes,
udp_addr,
http_addr,
timeout_secs,
..
} => {
let resolved = if !nodes.is_empty() {
nodes.clone()
} else if let (Some(udp), Some(http)) = (udp_addr, http_addr) {
vec![SipFlowClusterNode {
udp: udp.clone(),
http: http.clone(),
}]
} else {
anyhow::bail!(
"Remote backend requires either `nodes` or both `udp_addr` and `http_addr`"
)
};
remote::RemoteBackend::new(resolved, *timeout_secs)
.map(|b| Box::new(b) as Box<dyn SipFlowBackend>)
}
}
}