1use crate::{
2 call::ActiveCallType,
3 config::{CallRecordConfig, S3Vendor},
4};
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use futures::stream::{FuturesUnordered, StreamExt};
8use object_store::{
9 ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
10 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
11};
12use reqwest;
13use serde::{Deserialize, Serialize};
14use serde_with::skip_serializing_none;
15use std::{
16 collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
17 time::Instant,
18};
19use tokio::{fs::File, io::AsyncWriteExt};
20use tokio_util::sync::CancellationToken;
21use tracing::{info, warn};
22use voice_engine::CallOption;
23
24pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
25pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
26
27pub type FnSaveCallRecord = Arc<
28 Box<
29 dyn Fn(
30 CancellationToken,
31 Arc<dyn CallRecordFormatter>,
32 Arc<CallRecordConfig>,
33 CallRecord,
34 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
35 + Send
36 + Sync,
37 >,
38>;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub enum CallRecordEventType {
43 Event,
44 Command,
45 Sip,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct CallRecordEvent {
51 pub r#type: CallRecordEventType,
52 pub timestamp: u64,
53 pub content: String,
54}
55
56impl CallRecordEvent {
57 pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
58 let content = match serde_json::to_string(&obj) {
59 Ok(s) => s,
60 Err(_) => return,
61 };
62 let event = Self {
63 r#type,
64 timestamp: voice_engine::media::get_timestamp(),
65 content,
66 };
67 match serde_json::to_string(&event) {
68 Ok(line) => {
69 file.write_all(format!("{}\n", line).as_bytes()).await.ok();
70 }
71 Err(_) => {}
72 }
73 }
74}
75
76#[skip_serializing_none]
77#[derive(Debug, Clone, Serialize, Deserialize, Default)]
78#[serde(rename_all = "camelCase")]
79pub struct CallRecord {
80 pub call_type: ActiveCallType,
81 pub option: Option<CallOption>,
82 pub call_id: String,
83 pub start_time: DateTime<Utc>,
84 pub ring_time: Option<DateTime<Utc>>,
85 pub answer_time: Option<DateTime<Utc>>,
86 pub end_time: DateTime<Utc>,
87 pub caller: String,
88 pub callee: String,
89 pub status_code: u16,
90 pub hangup_reason: Option<CallRecordHangupReason>,
91 #[serde(skip_serializing_if = "Vec::is_empty")]
92 #[serde(default)]
93 pub hangup_messages: Vec<CallRecordHangupMessage>,
94 #[serde(skip_serializing_if = "Vec::is_empty")]
95 #[serde(default)]
96 pub recorder: Vec<CallRecordMedia>,
97 pub extras: Option<HashMap<String, serde_json::Value>>,
98 pub dump_event_file: Option<String>,
99 pub refer_callrecord: Option<Box<CallRecord>>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct CallRecordMedia {
105 pub track_id: String,
106 pub path: String,
107 pub size: u64,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub extra: Option<HashMap<String, serde_json::Value>>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub enum CallRecordHangupReason {
115 ByCaller,
116 ByCallee,
117 ByRefer,
118 BySystem,
119 Autohangup,
120 NoAnswer,
121 NoBalance,
122 AnswerMachine,
123 ServerUnavailable,
124 Canceled,
125 Rejected,
126 Failed,
127 Other(String),
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct CallRecordHangupMessage {
133 pub code: u16,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub reason: Option<String>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub target: Option<String>,
138}
139
140impl FromStr for CallRecordHangupReason {
141 type Err = anyhow::Error;
142
143 fn from_str(s: &str) -> Result<Self, Self::Err> {
144 match s.to_lowercase().as_str() {
145 "caller" => Ok(Self::ByCaller),
146 "callee" => Ok(Self::ByCallee),
147 "refer" => Ok(Self::ByRefer),
148 "system" => Ok(Self::BySystem),
149 "autohangup" => Ok(Self::Autohangup),
150 "noAnswer" => Ok(Self::NoAnswer),
151 "noBalance" => Ok(Self::NoBalance),
152 "answerMachine" => Ok(Self::AnswerMachine),
153 "serverUnavailable" => Ok(Self::ServerUnavailable),
154 "canceled" => Ok(Self::Canceled),
155 "rejected" => Ok(Self::Rejected),
156 "failed" => Ok(Self::Failed),
157 _ => Ok(Self::Other(s.to_string())),
158 }
159 }
160}
161impl ToString for CallRecordHangupReason {
162 fn to_string(&self) -> String {
163 match self {
164 Self::ByCaller => "caller".to_string(),
165 Self::ByCallee => "callee".to_string(),
166 Self::ByRefer => "refer".to_string(),
167 Self::BySystem => "system".to_string(),
168 Self::Autohangup => "autohangup".to_string(),
169 Self::NoAnswer => "noAnswer".to_string(),
170 Self::NoBalance => "noBalance".to_string(),
171 Self::AnswerMachine => "answerMachine".to_string(),
172 Self::ServerUnavailable => "serverUnavailable".to_string(),
173 Self::Canceled => "canceled".to_string(),
174 Self::Rejected => "rejected".to_string(),
175 Self::Failed => "failed".to_string(),
176 Self::Other(s) => s.to_string(),
177 }
178 }
179}
180
181pub fn default_cdr_file_name(record: &CallRecord) -> String {
182 format!(
183 "{}_{}.json",
184 record.start_time.format("%Y%m%d-%H%M%S"),
185 record.call_id
186 )
187}
188
189pub trait CallRecordFormatter: Send + Sync {
190 fn format(&self, record: &CallRecord) -> Result<String> {
191 Ok(serde_json::to_string(record)?)
192 }
193 fn format_file_name(&self, record: &CallRecord) -> String;
194 fn format_dump_events_path(&self, record: &CallRecord) -> String;
195 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
196}
197
198pub struct DefaultCallRecordFormatter {
199 pub root: String,
200}
201
202impl Default for DefaultCallRecordFormatter {
203 fn default() -> Self {
204 Self {
205 root: "./config/cdr".to_string(),
206 }
207 }
208}
209
210impl DefaultCallRecordFormatter {
211 pub fn new_with_config(config: &CallRecordConfig) -> Self {
212 let root = match config {
213 CallRecordConfig::Local { root } => root.clone(),
214 CallRecordConfig::S3 { root, .. } => root.clone(),
215 _ => "./config/cdr".to_string(),
216 };
217 Self { root }
218 }
219}
220
221impl CallRecordFormatter for DefaultCallRecordFormatter {
222 fn format_file_name(&self, record: &CallRecord) -> String {
223 let trimmed_root = self.root.trim_end_matches('/');
224 let file_name = default_cdr_file_name(record);
225 if trimmed_root.is_empty() {
226 file_name
227 } else {
228 format!(
229 "{}/{}/{}",
230 trimmed_root,
231 record.start_time.format("%Y%m%d"),
232 file_name
233 )
234 }
235 }
236
237 fn format_dump_events_path(&self, record: &CallRecord) -> String {
238 format!(
239 "{}/{}/{}.jsonl",
240 self.root.trim_end_matches('/'),
241 record.start_time.format("%Y%m%d"),
242 record.call_id
243 )
244 }
245
246 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
247 let file_name = Path::new(&media.path)
248 .file_name()
249 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
250 .to_string_lossy()
251 .to_string();
252
253 format!(
254 "{}/{}/{}_{}_{}",
255 self.root.trim_end_matches('/'),
256 record.start_time.format("%Y%m%d"),
257 record.call_id,
258 media.track_id,
259 file_name
260 )
261 }
262}
263
264pub fn build_object_store_from_s3(
265 vendor: &S3Vendor,
266 bucket: &str,
267 region: &str,
268 access_key: &str,
269 secret_key: &str,
270 endpoint: &str,
271) -> Result<Arc<dyn ObjectStore>> {
272 let store: Arc<dyn ObjectStore> = match vendor {
273 S3Vendor::AWS => {
274 let builder = AmazonS3Builder::new()
275 .with_bucket_name(bucket)
276 .with_region(region)
277 .with_access_key_id(access_key)
278 .with_secret_access_key(secret_key);
279
280 let instance = if !endpoint.is_empty() {
281 builder.with_endpoint(endpoint).build()?
282 } else {
283 builder.build()?
284 };
285 Arc::new(instance)
286 }
287 S3Vendor::GCP => {
288 let instance = GoogleCloudStorageBuilder::new()
289 .with_bucket_name(bucket)
290 .with_service_account_key(secret_key)
291 .build()?;
292 Arc::new(instance)
293 }
294 S3Vendor::Azure => {
295 let instance = MicrosoftAzureBuilder::new()
296 .with_container_name(bucket)
297 .with_account(access_key)
298 .with_access_key(secret_key)
299 .build()?;
300 Arc::new(instance)
301 }
302 S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
303 let instance = AmazonS3Builder::new()
304 .with_bucket_name(bucket)
305 .with_region(region)
306 .with_access_key_id(access_key)
307 .with_secret_access_key(secret_key)
308 .with_endpoint(endpoint)
309 .with_virtual_hosted_style_request(false)
310 .build()?;
311 Arc::new(instance)
312 }
313 };
314
315 Ok(store)
316}
317
318pub struct CallRecordManager {
319 pub max_concurrent: usize,
320 pub sender: CallRecordSender,
321 config: Arc<CallRecordConfig>,
322 cancel_token: CancellationToken,
323 receiver: CallRecordReceiver,
324 saver_fn: FnSaveCallRecord,
325 formatter: Arc<dyn CallRecordFormatter>,
326}
327
328pub struct CallRecordManagerBuilder {
329 pub cancel_token: Option<CancellationToken>,
330 pub config: Option<CallRecordConfig>,
331 pub max_concurrent: Option<usize>,
332 saver_fn: Option<FnSaveCallRecord>,
333 formatter: Option<Arc<dyn CallRecordFormatter>>,
334}
335
336impl CallRecordManagerBuilder {
337 pub fn new() -> Self {
338 Self {
339 cancel_token: None,
340 config: None,
341 max_concurrent: None,
342 saver_fn: None,
343 formatter: None,
344 }
345 }
346
347 pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
348 self.cancel_token = Some(cancel_token);
349 self
350 }
351
352 pub fn with_config(mut self, config: CallRecordConfig) -> Self {
353 self.config = Some(config);
354 self
355 }
356
357 pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
358 self.saver_fn = Some(saver);
359 self
360 }
361
362 pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
363 self.formatter = Some(formatter);
364 self
365 }
366
367 pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
368 self.max_concurrent = Some(max_concurrent);
369 self
370 }
371
372 pub fn build(self) -> CallRecordManager {
373 let cancel_token = self.cancel_token.unwrap_or_default();
374 let config = Arc::new(self.config.unwrap_or_default());
375 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
376 let saver_fn = self
377 .saver_fn
378 .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
379 let formatter = self
380 .formatter
381 .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
382 let max_concurrent = self.max_concurrent.unwrap_or(64);
383
384 match config.as_ref() {
385 CallRecordConfig::Local { root } => {
386 if !Path::new(&root).exists() {
387 match std::fs::create_dir_all(&root) {
388 Ok(_) => {
389 info!("CallRecordManager created directory: {}", root);
390 }
391 Err(e) => {
392 warn!("CallRecordManager failed to create directory: {}", e);
393 }
394 }
395 }
396 }
397 _ => {}
398 }
399
400 CallRecordManager {
401 max_concurrent,
402 cancel_token,
403 sender,
404 receiver,
405 config,
406 saver_fn,
407 formatter,
408 }
409 }
410}
411
412impl CallRecordManager {
413 fn default_saver(
414 _cancel_token: CancellationToken,
415 formatter: Arc<dyn CallRecordFormatter>,
416 config: Arc<CallRecordConfig>,
417 record: CallRecord,
418 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
419 Box::pin(async move {
420 let mut record = record;
421 let start_time = Instant::now();
422 let result = match config.as_ref() {
423 CallRecordConfig::Local { .. } => {
424 Self::save_local_record(formatter.clone(), &mut record).await
425 }
426 CallRecordConfig::S3 {
427 vendor,
428 bucket,
429 region,
430 access_key,
431 secret_key,
432 endpoint,
433 with_media,
434 keep_media_copy,
435 ..
436 } => {
437 Self::save_with_s3_like(
438 formatter.clone(),
439 vendor,
440 bucket,
441 region,
442 access_key,
443 secret_key,
444 endpoint,
445 with_media,
446 keep_media_copy,
447 &record,
448 )
449 .await
450 }
451 CallRecordConfig::Http {
452 url,
453 headers,
454 with_media,
455 keep_media_copy,
456 } => {
457 Self::save_with_http(
458 formatter.clone(),
459 url,
460 headers,
461 with_media,
462 keep_media_copy,
463 &record,
464 )
465 .await
466 }
467 };
468 let file_name = match result {
469 Ok(file_name) => file_name,
470 Err(e) => {
471 warn!("Failed to save call record: {}", e);
472 return Err(e);
473 }
474 };
475 let elapsed = start_time.elapsed();
476 info!(
477 ?elapsed,
478 call_id = record.call_id,
479 file_name,
480 "CallRecordManager saved"
481 );
482 Ok(())
483 })
484 }
485
486 async fn save_local_record(
487 formatter: Arc<dyn CallRecordFormatter>,
488 record: &mut CallRecord,
489 ) -> Result<String> {
490 let file_content = formatter.format(record)?;
491 let file_name = formatter.format_file_name(record);
492 let mut file = File::create(&file_name).await.map_err(|e| {
493 anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
494 })?;
495 file.write_all(file_content.as_bytes()).await?;
496 file.flush().await?;
497 Ok(file_name.to_string())
498 }
499
500 pub async fn save_with_http(
501 formatter: Arc<dyn CallRecordFormatter>,
502 url: &String,
503 headers: &Option<HashMap<String, String>>,
504 with_media: &Option<bool>,
505 keep_media_copy: &Option<bool>,
506 record: &CallRecord,
507 ) -> Result<String> {
508 let client = reqwest::Client::new();
509 let call_log_json = formatter.format(record)?;
511 let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
513
514 if with_media.unwrap_or(false) {
516 for media in &record.recorder {
517 if std::path::Path::new(&media.path).exists() {
518 match tokio::fs::read(&media.path).await {
519 Ok(file_content) => {
520 let file_name = std::path::Path::new(&media.path)
521 .file_name()
522 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
523 .to_string_lossy()
524 .to_string();
525
526 let part = match reqwest::multipart::Part::bytes(file_content)
527 .file_name(file_name.clone())
528 .mime_str("application/octet-stream")
529 {
530 Ok(part) => part,
531 Err(_) => {
532 reqwest::multipart::Part::bytes(
534 tokio::fs::read(&media.path).await?,
535 )
536 .file_name(file_name)
537 }
538 };
539
540 form = form.part(format!("media_{}", media.track_id), part);
541 }
542 Err(e) => {
543 warn!("Failed to read media file {}: {}", media.path, e);
544 }
545 }
546 }
547 }
548 if let Some(dump_events_file) = &record.dump_event_file {
549 if Path::new(&dump_events_file).exists() {
550 let file_name = Path::new(&dump_events_file)
551 .file_name()
552 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
553 .to_string_lossy()
554 .to_string();
555 match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
556 .file_name(file_name.clone())
557 .mime_str("application/octet-stream")
558 {
559 Ok(part) => {
560 form = form.part(format!("dump_events_{}", file_name), part);
561 }
562 Err(_) => {}
563 };
564 }
565 }
566 }
567 let mut request = client.post(url).multipart(form);
568 if let Some(headers_map) = headers {
569 for (key, value) in headers_map {
570 request = request.header(key, value);
571 }
572 }
573 let response = request.send().await?;
574 if response.status().is_success() {
575 let response_text = response.text().await.unwrap_or_default();
576
577 if keep_media_copy.unwrap_or(false) {
578 for media in &record.recorder {
579 let p = Path::new(&media.path);
580 if p.exists() {
581 tokio::fs::remove_file(p).await.ok();
582 }
583 }
584 }
585 Ok(format!("HTTP upload successful: {}", response_text))
586 } else {
587 Err(anyhow::anyhow!(
588 "HTTP upload failed with status: {} - {}",
589 response.status(),
590 response.text().await.unwrap_or_default()
591 ))
592 }
593 }
594
595 pub async fn save_with_s3_like(
596 formatter: Arc<dyn CallRecordFormatter>,
597 vendor: &S3Vendor,
598 bucket: &String,
599 region: &String,
600 access_key: &String,
601 secret_key: &String,
602 endpoint: &String,
603 with_media: &Option<bool>,
604 keep_media_copy: &Option<bool>,
605 record: &CallRecord,
606 ) -> Result<String> {
607 let start_time = Instant::now();
608 let object_store =
609 build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
610
611 let call_log_json = formatter.format(record)?;
613 let filename = formatter.format_file_name(record);
615 let local_files = vec![filename.clone()];
616 let json_path = ObjectPath::from(filename);
617 let buf_size = call_log_json.len();
618 match object_store.put(&json_path, call_log_json.into()).await {
619 Ok(_) => {
620 info!(
621 elapsed = start_time.elapsed().as_secs_f64(),
622 %json_path,
623 buf_size,
624 "upload call record"
625 );
626 }
627 Err(e) => {
628 warn!(
629 %json_path,
630 "failed to upload call record: {}", e
631 );
632 }
633 }
634 if with_media.unwrap_or(false) {
636 let mut media_files = vec![];
637 for media in &record.recorder {
638 if Path::new(&media.path).exists() {
639 let media_path = ObjectPath::from(formatter.format_media_path(record, media));
640 media_files.push((media.path.clone(), media_path));
641 }
642 }
643 if let Some(dump_events_file) = &record.dump_event_file {
644 if Path::new(&dump_events_file).exists() {
645 let dump_events_path =
646 ObjectPath::from(formatter.format_dump_events_path(record));
647 media_files.push((dump_events_file.clone(), dump_events_path));
648 }
649 }
650 for (path, media_path) in &media_files {
651 let start_time = Instant::now();
652 let file_content = match tokio::fs::read(path).await {
653 Ok(file_content) => file_content,
654 Err(e) => {
655 warn!("failed to read media file {}: {}", path, e);
656 continue;
657 }
658 };
659 let buf_size = file_content.len();
660 match object_store.put(media_path, file_content.into()).await {
661 Ok(_) => {
662 info!(
663 elapsed = start_time.elapsed().as_secs_f64(),
664 %media_path,
665 buf_size,
666 "upload media file"
667 );
668 }
669 Err(e) => {
670 warn!(%media_path,"failed to upload media file: {}", e);
671 }
672 }
673 }
674 }
675 if !keep_media_copy.unwrap_or(false) {
677 for media in &record.recorder {
678 let p = Path::new(&media.path);
679 if p.exists() {
680 tokio::fs::remove_file(p).await.ok();
681 }
682 }
683 for file_name in &local_files {
684 let p = Path::new(file_name);
685 if p.exists() {
686 tokio::fs::remove_file(p).await.ok();
687 }
688 }
689 }
690
691 Ok(format!(
692 "{}/{}",
693 endpoint.trim_end_matches('/'),
694 json_path.to_string().trim_start_matches('/')
695 ))
696 }
697
698 pub async fn serve(&mut self) {
699 let token = self.cancel_token.clone();
700 info!("CallRecordManager serving");
701 tokio::select! {
702 _ = token.cancelled() => {}
703 _ = self.recv_loop() => {}
704 }
705 info!("CallRecordManager served");
706 }
707
708 async fn recv_loop(&mut self) -> Result<()> {
709 let mut futures = FuturesUnordered::new();
710 loop {
711 let limit = self.max_concurrent - futures.len();
712 if limit == 0 {
713 if let Some(_) = futures.next().await {}
714 continue;
715 }
716 let mut buffer = Vec::with_capacity(limit);
717 if self.receiver.recv_many(&mut buffer, limit).await == 0 {
718 break;
719 }
720
721 for record in buffer {
722 let cancel_token_ref = self.cancel_token.clone();
723 let save_fn_ref = self.saver_fn.clone();
724 let config_ref = self.config.clone();
725 let formatter_ref = self.formatter.clone();
726
727 futures.push(async move {
728 if let Err(e) =
729 save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
730 {
731 warn!("Failed to save call record: {}", e);
732 }
733 });
734 }
735 while let Some(_) = futures.next().await {}
736 }
737 Ok(())
738 }
739}