1use crate::CallOption;
2use crate::{
3 call::ActiveCallType,
4 config::{CallRecordConfig, S3Vendor},
5};
6use anyhow::Result;
7use chrono::{DateTime, Local, Utc};
8use futures::stream::{FuturesUnordered, StreamExt};
9use object_store::PutPayload;
10use object_store::{
11 ObjectStore, ObjectStoreExt, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
12 gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
13};
14use reqwest;
15use serde::{Deserialize, Serialize};
16use serde_with::skip_serializing_none;
17use std::{
18 collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
19 time::Instant,
20};
21use tokio::{fs::File, io::AsyncWriteExt};
22use tokio_util::sync::CancellationToken;
23use tracing::{info, warn};
24
25pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
26pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
27
28pub type FnSaveCallRecord = Arc<
29 Box<
30 dyn Fn(
31 CancellationToken,
32 Arc<dyn CallRecordFormatter>,
33 Arc<CallRecordConfig>,
34 CallRecord,
35 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
36 + Send
37 + Sync,
38 >,
39>;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub enum CallRecordEventType {
44 Event,
45 Command,
46 Sip,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CallRecordEvent {
52 pub r#type: CallRecordEventType,
53 pub timestamp: u64,
54 pub content: String,
55}
56
57impl CallRecordEvent {
58 pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
59 let content = match serde_json::to_string(&obj) {
60 Ok(s) => s,
61 Err(_) => return,
62 };
63 let event = Self {
64 r#type,
65 timestamp: crate::media::get_timestamp(),
66 content,
67 };
68 match serde_json::to_string(&event) {
69 Ok(line) => {
70 file.write_all(format!("{}\n", line).as_bytes()).await.ok();
71 }
72 Err(_) => {}
73 }
74 }
75}
76
77#[skip_serializing_none]
78#[derive(Debug, Clone, Serialize, Deserialize, Default)]
79#[serde(rename_all = "camelCase")]
80pub struct CallRecord {
81 pub call_type: ActiveCallType,
82 pub option: Option<CallOption>,
83 pub call_id: String,
84 pub start_time: DateTime<Utc>,
85 pub ring_time: Option<DateTime<Utc>>,
86 pub answer_time: Option<DateTime<Utc>>,
87 pub end_time: DateTime<Utc>,
88 pub caller: String,
89 pub callee: String,
90 pub status_code: u16,
91 pub hangup_reason: Option<CallRecordHangupReason>,
92 #[serde(skip_serializing_if = "Vec::is_empty")]
93 #[serde(default)]
94 pub hangup_messages: Vec<CallRecordHangupMessage>,
95 #[serde(skip_serializing_if = "Vec::is_empty")]
96 #[serde(default)]
97 pub recorder: Vec<CallRecordMedia>,
98 pub extras: Option<HashMap<String, serde_json::Value>>,
99 pub dump_event_file: Option<String>,
100 pub refer_callrecord: Option<Box<CallRecord>>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct CallRecordMedia {
106 pub track_id: String,
107 pub path: String,
108 pub size: u64,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub extra: Option<HashMap<String, serde_json::Value>>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub enum CallRecordHangupReason {
116 ByCaller,
117 ByCallee,
118 ByRefer,
119 BySystem,
120 Autohangup,
121 InactivityTimeout,
122 NoAnswer,
123 NoBalance,
124 AnswerMachine,
125 ServerUnavailable,
126 Canceled,
127 Rejected,
128 Failed,
129 Other(String),
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "camelCase")]
134pub struct CallRecordHangupMessage {
135 pub code: u16,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub reason: Option<String>,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub target: Option<String>,
140}
141
142impl FromStr for CallRecordHangupReason {
143 type Err = anyhow::Error;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 match s.to_lowercase().as_str() {
147 "caller" => Ok(Self::ByCaller),
148 "callee" => Ok(Self::ByCallee),
149 "refer" => Ok(Self::ByRefer),
150 "system" => Ok(Self::BySystem),
151 "autohangup" => Ok(Self::Autohangup),
152 "inactivitytimeout" => Ok(Self::InactivityTimeout),
153 "noAnswer" => Ok(Self::NoAnswer),
154 "noBalance" => Ok(Self::NoBalance),
155 "answerMachine" => Ok(Self::AnswerMachine),
156 "serverUnavailable" => Ok(Self::ServerUnavailable),
157 "canceled" => Ok(Self::Canceled),
158 "rejected" => Ok(Self::Rejected),
159 "failed" => Ok(Self::Failed),
160 _ => Ok(Self::Other(s.to_string())),
161 }
162 }
163}
164impl ToString for CallRecordHangupReason {
165 fn to_string(&self) -> String {
166 match self {
167 Self::ByCaller => "caller".to_string(),
168 Self::ByCallee => "callee".to_string(),
169 Self::ByRefer => "refer".to_string(),
170 Self::BySystem => "system".to_string(),
171 Self::Autohangup => "autohangup".to_string(),
172 Self::InactivityTimeout => "inactivityTimeout".to_string(),
173 Self::NoAnswer => "noAnswer".to_string(),
174 Self::NoBalance => "noBalance".to_string(),
175 Self::AnswerMachine => "answerMachine".to_string(),
176 Self::ServerUnavailable => "serverUnavailable".to_string(),
177 Self::Canceled => "canceled".to_string(),
178 Self::Rejected => "rejected".to_string(),
179 Self::Failed => "failed".to_string(),
180 Self::Other(s) => s.to_string(),
181 }
182 }
183}
184
185pub fn default_cdr_file_name(record: &CallRecord) -> String {
186 format!(
187 "{}_{}.json",
188 record
189 .start_time
190 .with_timezone(&Local)
191 .format("%Y%m%d-%H%M%S"),
192 record.call_id
193 )
194}
195
196pub trait CallRecordFormatter: Send + Sync {
197 fn format(&self, record: &CallRecord) -> Result<String> {
198 Ok(serde_json::to_string(record)?)
199 }
200 fn format_file_name(&self, record: &CallRecord) -> String;
201 fn format_dump_events_path(&self, record: &CallRecord) -> String;
202 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
203}
204
205pub struct DefaultCallRecordFormatter {
206 pub root: String,
207}
208
209impl Default for DefaultCallRecordFormatter {
210 fn default() -> Self {
211 Self {
212 root: "./config/cdr".to_string(),
213 }
214 }
215}
216
217impl DefaultCallRecordFormatter {
218 pub fn new_with_config(config: &CallRecordConfig) -> Self {
219 let root = match config {
220 CallRecordConfig::Local { root } => root.clone(),
221 CallRecordConfig::S3 { root, .. } => root.clone(),
222 _ => "./config/cdr".to_string(),
223 };
224 Self { root }
225 }
226}
227
228impl CallRecordFormatter for DefaultCallRecordFormatter {
229 fn format_file_name(&self, record: &CallRecord) -> String {
230 let trimmed_root = self.root.trim_end_matches('/');
231 let file_name = default_cdr_file_name(record);
232 if trimmed_root.is_empty() {
233 file_name
234 } else {
235 format!(
236 "{}/{}/{}",
237 trimmed_root,
238 record.start_time.with_timezone(&Local).format("%Y%m%d"),
239 file_name
240 )
241 }
242 }
243
244 fn format_dump_events_path(&self, record: &CallRecord) -> String {
245 format!(
246 "{}/{}/{}.jsonl",
247 self.root.trim_end_matches('/'),
248 record.start_time.with_timezone(&Local).format("%Y%m%d"),
249 record.call_id
250 )
251 }
252
253 fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
254 let file_name = Path::new(&media.path)
255 .file_name()
256 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
257 .to_string_lossy()
258 .to_string();
259
260 format!(
261 "{}/{}/{}_{}_{}",
262 self.root.trim_end_matches('/'),
263 record.start_time.with_timezone(&Local).format("%Y%m%d"),
264 record.call_id,
265 media.track_id,
266 file_name
267 )
268 }
269}
270
271pub fn build_object_store_from_s3(
272 vendor: &S3Vendor,
273 bucket: &str,
274 region: &str,
275 access_key: &str,
276 secret_key: &str,
277 endpoint: &str,
278) -> Result<Arc<dyn ObjectStore>> {
279 let store: Arc<dyn ObjectStore> = match vendor {
280 S3Vendor::AWS => {
281 let builder = AmazonS3Builder::new()
282 .with_bucket_name(bucket)
283 .with_region(region)
284 .with_access_key_id(access_key)
285 .with_secret_access_key(secret_key);
286
287 let instance = if !endpoint.is_empty() {
288 builder.with_endpoint(endpoint).build()?
289 } else {
290 builder.build()?
291 };
292 Arc::new(instance)
293 }
294 S3Vendor::GCP => {
295 let instance = GoogleCloudStorageBuilder::new()
296 .with_bucket_name(bucket)
297 .with_service_account_key(secret_key)
298 .build()?;
299 Arc::new(instance)
300 }
301 S3Vendor::Azure => {
302 let instance = MicrosoftAzureBuilder::new()
303 .with_container_name(bucket)
304 .with_account(access_key)
305 .with_access_key(secret_key)
306 .build()?;
307 Arc::new(instance)
308 }
309 S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
310 let instance = AmazonS3Builder::new()
311 .with_bucket_name(bucket)
312 .with_region(region)
313 .with_access_key_id(access_key)
314 .with_secret_access_key(secret_key)
315 .with_endpoint(endpoint)
316 .with_virtual_hosted_style_request(false)
317 .build()?;
318 Arc::new(instance)
319 }
320 };
321
322 Ok(store)
323}
324
325pub struct CallRecordManager {
326 pub max_concurrent: usize,
327 pub sender: CallRecordSender,
328 config: Arc<CallRecordConfig>,
329 cancel_token: CancellationToken,
330 receiver: CallRecordReceiver,
331 saver_fn: FnSaveCallRecord,
332 formatter: Arc<dyn CallRecordFormatter>,
333}
334
335pub struct CallRecordManagerBuilder {
336 pub cancel_token: Option<CancellationToken>,
337 pub config: Option<CallRecordConfig>,
338 pub max_concurrent: Option<usize>,
339 saver_fn: Option<FnSaveCallRecord>,
340 formatter: Option<Arc<dyn CallRecordFormatter>>,
341}
342
343impl CallRecordManagerBuilder {
344 pub fn new() -> Self {
345 Self {
346 cancel_token: None,
347 config: None,
348 max_concurrent: None,
349 saver_fn: None,
350 formatter: None,
351 }
352 }
353
354 pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
355 self.cancel_token = Some(cancel_token);
356 self
357 }
358
359 pub fn with_config(mut self, config: CallRecordConfig) -> Self {
360 self.config = Some(config);
361 self
362 }
363
364 pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
365 self.saver_fn = Some(saver);
366 self
367 }
368
369 pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
370 self.formatter = Some(formatter);
371 self
372 }
373
374 pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
375 self.max_concurrent = Some(max_concurrent);
376 self
377 }
378
379 pub fn build(self) -> CallRecordManager {
380 let cancel_token = self.cancel_token.unwrap_or_default();
381 let config = Arc::new(self.config.unwrap_or_default());
382 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
383 let saver_fn = self
384 .saver_fn
385 .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
386 let formatter = self
387 .formatter
388 .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
389 let max_concurrent = self.max_concurrent.unwrap_or(64);
390
391 match config.as_ref() {
392 CallRecordConfig::Local { root } => {
393 if !Path::new(&root).exists() {
394 match std::fs::create_dir_all(&root) {
395 Ok(_) => {
396 info!("CallRecordManager created directory: {}", root);
397 }
398 Err(e) => {
399 warn!("CallRecordManager failed to create directory: {}", e);
400 }
401 }
402 }
403 }
404 _ => {}
405 }
406
407 CallRecordManager {
408 max_concurrent,
409 cancel_token,
410 sender,
411 receiver,
412 config,
413 saver_fn,
414 formatter,
415 }
416 }
417}
418
419impl CallRecordManager {
420 fn default_saver(
421 _cancel_token: CancellationToken,
422 formatter: Arc<dyn CallRecordFormatter>,
423 config: Arc<CallRecordConfig>,
424 record: CallRecord,
425 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
426 Box::pin(async move {
427 let mut record = record;
428 let start_time = Instant::now();
429 let result = match config.as_ref() {
430 CallRecordConfig::Local { .. } => {
431 Self::save_local_record(formatter.clone(), &mut record).await
432 }
433 CallRecordConfig::S3 {
434 vendor,
435 bucket,
436 region,
437 access_key,
438 secret_key,
439 endpoint,
440 with_media,
441 keep_media_copy,
442 ..
443 } => {
444 Self::save_with_s3_like(
445 formatter.clone(),
446 vendor,
447 bucket,
448 region,
449 access_key,
450 secret_key,
451 endpoint,
452 with_media,
453 keep_media_copy,
454 &record,
455 )
456 .await
457 }
458 CallRecordConfig::Http {
459 url,
460 headers,
461 with_media,
462 keep_media_copy,
463 } => {
464 Self::save_with_http(
465 formatter.clone(),
466 url,
467 headers,
468 with_media,
469 keep_media_copy,
470 &record,
471 )
472 .await
473 }
474 };
475 let file_name = match result {
476 Ok(file_name) => file_name,
477 Err(e) => {
478 warn!("Failed to save call record: {}", e);
479 return Err(e);
480 }
481 };
482 let elapsed = start_time.elapsed();
483 info!(
484 ?elapsed,
485 call_id = record.call_id,
486 file_name,
487 "CallRecordManager saved"
488 );
489 Ok(())
490 })
491 }
492
493 async fn save_local_record(
494 formatter: Arc<dyn CallRecordFormatter>,
495 record: &mut CallRecord,
496 ) -> Result<String> {
497 let file_content = formatter.format(record)?;
498 let file_name = formatter.format_file_name(record);
499
500 if let Some(parent) = Path::new(&file_name).parent() {
502 tokio::fs::create_dir_all(parent).await.map_err(|e| {
503 anyhow::anyhow!("Failed to create parent directory for {}: {}", file_name, e)
504 })?;
505 }
506
507 let mut file = File::create(&file_name).await.map_err(|e| {
508 anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
509 })?;
510 file.write_all(file_content.as_bytes()).await?;
511 file.flush().await?;
512 Ok(file_name.to_string())
513 }
514
515 pub async fn save_with_http(
516 formatter: Arc<dyn CallRecordFormatter>,
517 url: &String,
518 headers: &Option<HashMap<String, String>>,
519 with_media: &Option<bool>,
520 keep_media_copy: &Option<bool>,
521 record: &CallRecord,
522 ) -> Result<String> {
523 let client = reqwest::Client::new();
524 let call_log_json = formatter.format(record)?;
526 let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
528
529 if with_media.unwrap_or(false) {
531 for media in &record.recorder {
532 if std::path::Path::new(&media.path).exists() {
533 match tokio::fs::read(&media.path).await {
534 Ok(file_content) => {
535 let file_name = std::path::Path::new(&media.path)
536 .file_name()
537 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
538 .to_string_lossy()
539 .to_string();
540
541 let part = match reqwest::multipart::Part::bytes(file_content)
542 .file_name(file_name.clone())
543 .mime_str("application/octet-stream")
544 {
545 Ok(part) => part,
546 Err(_) => {
547 reqwest::multipart::Part::bytes(
549 tokio::fs::read(&media.path).await?,
550 )
551 .file_name(file_name)
552 }
553 };
554
555 form = form.part(format!("media_{}", media.track_id), part);
556 }
557 Err(e) => {
558 warn!("Failed to read media file {}: {}", media.path, e);
559 }
560 }
561 }
562 }
563 if let Some(dump_events_file) = &record.dump_event_file {
564 if Path::new(&dump_events_file).exists() {
565 let file_name = Path::new(&dump_events_file)
566 .file_name()
567 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
568 .to_string_lossy()
569 .to_string();
570 match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
571 .file_name(file_name.clone())
572 .mime_str("application/octet-stream")
573 {
574 Ok(part) => {
575 form = form.part(format!("dump_events_{}", file_name), part);
576 }
577 Err(_) => {}
578 };
579 }
580 }
581 }
582 let mut request = client.post(url).multipart(form);
583 if let Some(headers_map) = headers {
584 for (key, value) in headers_map {
585 request = request.header(key, value);
586 }
587 }
588 let response = request.send().await?;
589 if response.status().is_success() {
590 let response_text = response.text().await.unwrap_or_default();
591
592 if keep_media_copy.unwrap_or(false) {
593 for media in &record.recorder {
594 let p = Path::new(&media.path);
595 if p.exists() {
596 tokio::fs::remove_file(p).await.ok();
597 }
598 }
599 }
600 Ok(format!("HTTP upload successful: {}", response_text))
601 } else {
602 Err(anyhow::anyhow!(
603 "HTTP upload failed with status: {} - {}",
604 response.status(),
605 response.text().await.unwrap_or_default()
606 ))
607 }
608 }
609
610 pub async fn save_with_s3_like(
611 formatter: Arc<dyn CallRecordFormatter>,
612 vendor: &S3Vendor,
613 bucket: &String,
614 region: &String,
615 access_key: &String,
616 secret_key: &String,
617 endpoint: &String,
618 with_media: &Option<bool>,
619 keep_media_copy: &Option<bool>,
620 record: &CallRecord,
621 ) -> Result<String> {
622 let start_time = Instant::now();
623 let object_store =
624 build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
625
626 let call_log_json = formatter.format(record)?;
628 let filename = formatter.format_file_name(record);
630 let local_files = vec![filename.clone()];
631 let json_path = ObjectPath::from(filename);
632 let buf_size = call_log_json.len();
633 match object_store
634 .put(&json_path, PutPayload::from(call_log_json))
635 .await
636 {
637 Ok(_) => {
638 info!(
639 elapsed = start_time.elapsed().as_secs_f64(),
640 %json_path,
641 buf_size,
642 "upload call record"
643 );
644 }
645 Err(e) => {
646 warn!(
647 %json_path,
648 "failed to upload call record: {}", e
649 );
650 }
651 }
652 if with_media.unwrap_or(false) {
654 let mut media_files = vec![];
655 for media in &record.recorder {
656 if Path::new(&media.path).exists() {
657 let media_path = ObjectPath::from(formatter.format_media_path(record, media));
658 media_files.push((media.path.clone(), media_path));
659 }
660 }
661 if let Some(dump_events_file) = &record.dump_event_file {
662 if Path::new(&dump_events_file).exists() {
663 let dump_events_path =
664 ObjectPath::from(formatter.format_dump_events_path(record));
665 media_files.push((dump_events_file.clone(), dump_events_path));
666 }
667 }
668 for (path, media_path) in &media_files {
669 let start_time = Instant::now();
670 let file_content = match tokio::fs::read(path).await {
671 Ok(file_content) => file_content,
672 Err(e) => {
673 warn!("failed to read media file {}: {}", path, e);
674 continue;
675 }
676 };
677 let buf_size = file_content.len();
678 match object_store
679 .put(media_path, PutPayload::from(file_content))
680 .await
681 {
682 Ok(_) => {
683 info!(
684 elapsed = start_time.elapsed().as_secs_f64(),
685 %media_path,
686 buf_size,
687 "upload media file"
688 );
689 }
690 Err(e) => {
691 warn!(%media_path,"failed to upload media file: {}", e);
692 }
693 }
694 }
695 }
696 if !keep_media_copy.unwrap_or(false) {
698 for media in &record.recorder {
699 let p = Path::new(&media.path);
700 if p.exists() {
701 tokio::fs::remove_file(p).await.ok();
702 }
703 }
704 for file_name in &local_files {
705 let p = Path::new(file_name);
706 if p.exists() {
707 tokio::fs::remove_file(p).await.ok();
708 }
709 }
710 }
711
712 Ok(format!(
713 "{}/{}",
714 endpoint.trim_end_matches('/'),
715 json_path.to_string().trim_start_matches('/')
716 ))
717 }
718
719 pub async fn serve(&mut self) {
720 let token = self.cancel_token.clone();
721 info!("CallRecordManager serving");
722 tokio::select! {
723 _ = token.cancelled() => {}
724 _ = self.recv_loop() => {}
725 }
726 info!("CallRecordManager served");
727 }
728
729 async fn recv_loop(&mut self) -> Result<()> {
730 let mut futures = FuturesUnordered::new();
731 loop {
732 let limit = self.max_concurrent - futures.len();
733 if limit == 0 {
734 if let Some(_) = futures.next().await {}
735 continue;
736 }
737 let mut buffer = Vec::with_capacity(limit);
738 if self.receiver.recv_many(&mut buffer, limit).await == 0 {
739 break;
740 }
741
742 for record in buffer {
743 let cancel_token_ref = self.cancel_token.clone();
744 let save_fn_ref = self.saver_fn.clone();
745 let config_ref = self.config.clone();
746 let formatter_ref = self.formatter.clone();
747
748 futures.push(async move {
749 if let Err(e) =
750 save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
751 {
752 warn!("Failed to save call record: {}", e);
753 }
754 });
755 }
756 while let Some(_) = futures.next().await {}
757 }
758 Ok(())
759 }
760}