active_call/callrecord/
mod.rs

1use crate::{
2    call::ActiveCallType,
3    config::{CallRecordConfig, S3Vendor},
4};
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use futures::stream::{FuturesUnordered, StreamExt};
8use object_store::{
9    ObjectStore, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
10    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
11};
12use reqwest;
13use serde::{Deserialize, Serialize};
14use serde_with::skip_serializing_none;
15use std::{
16    collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
17    time::Instant,
18};
19use tokio::{fs::File, io::AsyncWriteExt};
20use tokio_util::sync::CancellationToken;
21use tracing::{info, warn};
22use voice_engine::CallOption;
23
24pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
25pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
26
27pub type FnSaveCallRecord = Arc<
28    Box<
29        dyn Fn(
30                CancellationToken,
31                Arc<dyn CallRecordFormatter>,
32                Arc<CallRecordConfig>,
33                CallRecord,
34            ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
35            + Send
36            + Sync,
37    >,
38>;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(rename_all = "camelCase")]
42pub enum CallRecordEventType {
43    Event,
44    Command,
45    Sip,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct CallRecordEvent {
51    pub r#type: CallRecordEventType,
52    pub timestamp: u64,
53    pub content: String,
54}
55
56impl CallRecordEvent {
57    pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
58        let content = match serde_json::to_string(&obj) {
59            Ok(s) => s,
60            Err(_) => return,
61        };
62        let event = Self {
63            r#type,
64            timestamp: voice_engine::media::get_timestamp(),
65            content,
66        };
67        match serde_json::to_string(&event) {
68            Ok(line) => {
69                file.write_all(format!("{}\n", line).as_bytes()).await.ok();
70            }
71            Err(_) => {}
72        }
73    }
74}
75
76#[skip_serializing_none]
77#[derive(Debug, Clone, Serialize, Deserialize, Default)]
78#[serde(rename_all = "camelCase")]
79pub struct CallRecord {
80    pub call_type: ActiveCallType,
81    pub option: Option<CallOption>,
82    pub call_id: String,
83    pub start_time: DateTime<Utc>,
84    pub ring_time: Option<DateTime<Utc>>,
85    pub answer_time: Option<DateTime<Utc>>,
86    pub end_time: DateTime<Utc>,
87    pub caller: String,
88    pub callee: String,
89    pub status_code: u16,
90    pub hangup_reason: Option<CallRecordHangupReason>,
91    #[serde(skip_serializing_if = "Vec::is_empty")]
92    #[serde(default)]
93    pub hangup_messages: Vec<CallRecordHangupMessage>,
94    #[serde(skip_serializing_if = "Vec::is_empty")]
95    #[serde(default)]
96    pub recorder: Vec<CallRecordMedia>,
97    pub extras: Option<HashMap<String, serde_json::Value>>,
98    pub dump_event_file: Option<String>,
99    pub refer_callrecord: Option<Box<CallRecord>>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct CallRecordMedia {
105    pub track_id: String,
106    pub path: String,
107    pub size: u64,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub extra: Option<HashMap<String, serde_json::Value>>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub enum CallRecordHangupReason {
115    ByCaller,
116    ByCallee,
117    ByRefer,
118    BySystem,
119    Autohangup,
120    NoAnswer,
121    NoBalance,
122    AnswerMachine,
123    ServerUnavailable,
124    Canceled,
125    Rejected,
126    Failed,
127    Other(String),
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "camelCase")]
132pub struct CallRecordHangupMessage {
133    pub code: u16,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub reason: Option<String>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub target: Option<String>,
138}
139
140impl FromStr for CallRecordHangupReason {
141    type Err = anyhow::Error;
142
143    fn from_str(s: &str) -> Result<Self, Self::Err> {
144        match s.to_lowercase().as_str() {
145            "caller" => Ok(Self::ByCaller),
146            "callee" => Ok(Self::ByCallee),
147            "refer" => Ok(Self::ByRefer),
148            "system" => Ok(Self::BySystem),
149            "autohangup" => Ok(Self::Autohangup),
150            "noAnswer" => Ok(Self::NoAnswer),
151            "noBalance" => Ok(Self::NoBalance),
152            "answerMachine" => Ok(Self::AnswerMachine),
153            "serverUnavailable" => Ok(Self::ServerUnavailable),
154            "canceled" => Ok(Self::Canceled),
155            "rejected" => Ok(Self::Rejected),
156            "failed" => Ok(Self::Failed),
157            _ => Ok(Self::Other(s.to_string())),
158        }
159    }
160}
161impl ToString for CallRecordHangupReason {
162    fn to_string(&self) -> String {
163        match self {
164            Self::ByCaller => "caller".to_string(),
165            Self::ByCallee => "callee".to_string(),
166            Self::ByRefer => "refer".to_string(),
167            Self::BySystem => "system".to_string(),
168            Self::Autohangup => "autohangup".to_string(),
169            Self::NoAnswer => "noAnswer".to_string(),
170            Self::NoBalance => "noBalance".to_string(),
171            Self::AnswerMachine => "answerMachine".to_string(),
172            Self::ServerUnavailable => "serverUnavailable".to_string(),
173            Self::Canceled => "canceled".to_string(),
174            Self::Rejected => "rejected".to_string(),
175            Self::Failed => "failed".to_string(),
176            Self::Other(s) => s.to_string(),
177        }
178    }
179}
180
181pub fn default_cdr_file_name(record: &CallRecord) -> String {
182    format!(
183        "{}_{}.json",
184        record.start_time.format("%Y%m%d-%H%M%S"),
185        record.call_id
186    )
187}
188
189pub trait CallRecordFormatter: Send + Sync {
190    fn format(&self, record: &CallRecord) -> Result<String> {
191        Ok(serde_json::to_string(record)?)
192    }
193    fn format_file_name(&self, record: &CallRecord) -> String;
194    fn format_dump_events_path(&self, record: &CallRecord) -> String;
195    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
196}
197
198pub struct DefaultCallRecordFormatter {
199    pub root: String,
200}
201
202impl Default for DefaultCallRecordFormatter {
203    fn default() -> Self {
204        Self {
205            root: "./config/cdr".to_string(),
206        }
207    }
208}
209
210impl DefaultCallRecordFormatter {
211    pub fn new_with_config(config: &CallRecordConfig) -> Self {
212        let root = match config {
213            CallRecordConfig::Local { root } => root.clone(),
214            CallRecordConfig::S3 { root, .. } => root.clone(),
215            _ => "./config/cdr".to_string(),
216        };
217        Self { root }
218    }
219}
220
221impl CallRecordFormatter for DefaultCallRecordFormatter {
222    fn format_file_name(&self, record: &CallRecord) -> String {
223        let trimmed_root = self.root.trim_end_matches('/');
224        let file_name = default_cdr_file_name(record);
225        if trimmed_root.is_empty() {
226            file_name
227        } else {
228            format!(
229                "{}/{}/{}",
230                trimmed_root,
231                record.start_time.format("%Y%m%d"),
232                file_name
233            )
234        }
235    }
236
237    fn format_dump_events_path(&self, record: &CallRecord) -> String {
238        format!(
239            "{}/{}/{}.jsonl",
240            self.root.trim_end_matches('/'),
241            record.start_time.format("%Y%m%d"),
242            record.call_id
243        )
244    }
245
246    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
247        let file_name = Path::new(&media.path)
248            .file_name()
249            .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
250            .to_string_lossy()
251            .to_string();
252
253        format!(
254            "{}/{}/{}_{}_{}",
255            self.root.trim_end_matches('/'),
256            record.start_time.format("%Y%m%d"),
257            record.call_id,
258            media.track_id,
259            file_name
260        )
261    }
262}
263
264pub fn build_object_store_from_s3(
265    vendor: &S3Vendor,
266    bucket: &str,
267    region: &str,
268    access_key: &str,
269    secret_key: &str,
270    endpoint: &str,
271) -> Result<Arc<dyn ObjectStore>> {
272    let store: Arc<dyn ObjectStore> = match vendor {
273        S3Vendor::AWS => {
274            let builder = AmazonS3Builder::new()
275                .with_bucket_name(bucket)
276                .with_region(region)
277                .with_access_key_id(access_key)
278                .with_secret_access_key(secret_key);
279
280            let instance = if !endpoint.is_empty() {
281                builder.with_endpoint(endpoint).build()?
282            } else {
283                builder.build()?
284            };
285            Arc::new(instance)
286        }
287        S3Vendor::GCP => {
288            let instance = GoogleCloudStorageBuilder::new()
289                .with_bucket_name(bucket)
290                .with_service_account_key(secret_key)
291                .build()?;
292            Arc::new(instance)
293        }
294        S3Vendor::Azure => {
295            let instance = MicrosoftAzureBuilder::new()
296                .with_container_name(bucket)
297                .with_account(access_key)
298                .with_access_key(secret_key)
299                .build()?;
300            Arc::new(instance)
301        }
302        S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
303            let instance = AmazonS3Builder::new()
304                .with_bucket_name(bucket)
305                .with_region(region)
306                .with_access_key_id(access_key)
307                .with_secret_access_key(secret_key)
308                .with_endpoint(endpoint)
309                .with_virtual_hosted_style_request(false)
310                .build()?;
311            Arc::new(instance)
312        }
313    };
314
315    Ok(store)
316}
317
318pub struct CallRecordManager {
319    pub max_concurrent: usize,
320    pub sender: CallRecordSender,
321    config: Arc<CallRecordConfig>,
322    cancel_token: CancellationToken,
323    receiver: CallRecordReceiver,
324    saver_fn: FnSaveCallRecord,
325    formatter: Arc<dyn CallRecordFormatter>,
326}
327
328pub struct CallRecordManagerBuilder {
329    pub cancel_token: Option<CancellationToken>,
330    pub config: Option<CallRecordConfig>,
331    pub max_concurrent: Option<usize>,
332    saver_fn: Option<FnSaveCallRecord>,
333    formatter: Option<Arc<dyn CallRecordFormatter>>,
334}
335
336impl CallRecordManagerBuilder {
337    pub fn new() -> Self {
338        Self {
339            cancel_token: None,
340            config: None,
341            max_concurrent: None,
342            saver_fn: None,
343            formatter: None,
344        }
345    }
346
347    pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
348        self.cancel_token = Some(cancel_token);
349        self
350    }
351
352    pub fn with_config(mut self, config: CallRecordConfig) -> Self {
353        self.config = Some(config);
354        self
355    }
356
357    pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
358        self.saver_fn = Some(saver);
359        self
360    }
361
362    pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
363        self.formatter = Some(formatter);
364        self
365    }
366
367    pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
368        self.max_concurrent = Some(max_concurrent);
369        self
370    }
371
372    pub fn build(self) -> CallRecordManager {
373        let cancel_token = self.cancel_token.unwrap_or_default();
374        let config = Arc::new(self.config.unwrap_or_default());
375        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
376        let saver_fn = self
377            .saver_fn
378            .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
379        let formatter = self
380            .formatter
381            .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
382        let max_concurrent = self.max_concurrent.unwrap_or(64);
383
384        match config.as_ref() {
385            CallRecordConfig::Local { root } => {
386                if !Path::new(&root).exists() {
387                    match std::fs::create_dir_all(&root) {
388                        Ok(_) => {
389                            info!("CallRecordManager created directory: {}", root);
390                        }
391                        Err(e) => {
392                            warn!("CallRecordManager failed to create directory: {}", e);
393                        }
394                    }
395                }
396            }
397            _ => {}
398        }
399
400        CallRecordManager {
401            max_concurrent,
402            cancel_token,
403            sender,
404            receiver,
405            config,
406            saver_fn,
407            formatter,
408        }
409    }
410}
411
412impl CallRecordManager {
413    fn default_saver(
414        _cancel_token: CancellationToken,
415        formatter: Arc<dyn CallRecordFormatter>,
416        config: Arc<CallRecordConfig>,
417        record: CallRecord,
418    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
419        Box::pin(async move {
420            let mut record = record;
421            let start_time = Instant::now();
422            let result = match config.as_ref() {
423                CallRecordConfig::Local { .. } => {
424                    Self::save_local_record(formatter.clone(), &mut record).await
425                }
426                CallRecordConfig::S3 {
427                    vendor,
428                    bucket,
429                    region,
430                    access_key,
431                    secret_key,
432                    endpoint,
433                    with_media,
434                    keep_media_copy,
435                    ..
436                } => {
437                    Self::save_with_s3_like(
438                        formatter.clone(),
439                        vendor,
440                        bucket,
441                        region,
442                        access_key,
443                        secret_key,
444                        endpoint,
445                        with_media,
446                        keep_media_copy,
447                        &record,
448                    )
449                    .await
450                }
451                CallRecordConfig::Http {
452                    url,
453                    headers,
454                    with_media,
455                    keep_media_copy,
456                } => {
457                    Self::save_with_http(
458                        formatter.clone(),
459                        url,
460                        headers,
461                        with_media,
462                        keep_media_copy,
463                        &record,
464                    )
465                    .await
466                }
467            };
468            let file_name = match result {
469                Ok(file_name) => file_name,
470                Err(e) => {
471                    warn!("Failed to save call record: {}", e);
472                    return Err(e);
473                }
474            };
475            let elapsed = start_time.elapsed();
476            info!(
477                ?elapsed,
478                call_id = record.call_id,
479                file_name,
480                "CallRecordManager saved"
481            );
482            Ok(())
483        })
484    }
485
486    async fn save_local_record(
487        formatter: Arc<dyn CallRecordFormatter>,
488        record: &mut CallRecord,
489    ) -> Result<String> {
490        let file_content = formatter.format(record)?;
491        let file_name = formatter.format_file_name(record);
492        let mut file = File::create(&file_name).await.map_err(|e| {
493            anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
494        })?;
495        file.write_all(file_content.as_bytes()).await?;
496        file.flush().await?;
497        Ok(file_name.to_string())
498    }
499
500    pub async fn save_with_http(
501        formatter: Arc<dyn CallRecordFormatter>,
502        url: &String,
503        headers: &Option<HashMap<String, String>>,
504        with_media: &Option<bool>,
505        keep_media_copy: &Option<bool>,
506        record: &CallRecord,
507    ) -> Result<String> {
508        let client = reqwest::Client::new();
509        // Serialize call record to JSON
510        let call_log_json = formatter.format(record)?;
511        // Create multipart form
512        let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
513
514        // Add media files if with_media is true
515        if with_media.unwrap_or(false) {
516            for media in &record.recorder {
517                if std::path::Path::new(&media.path).exists() {
518                    match tokio::fs::read(&media.path).await {
519                        Ok(file_content) => {
520                            let file_name = std::path::Path::new(&media.path)
521                                .file_name()
522                                .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
523                                .to_string_lossy()
524                                .to_string();
525
526                            let part = match reqwest::multipart::Part::bytes(file_content)
527                                .file_name(file_name.clone())
528                                .mime_str("application/octet-stream")
529                            {
530                                Ok(part) => part,
531                                Err(_) => {
532                                    // Fallback to default MIME type if parsing fails
533                                    reqwest::multipart::Part::bytes(
534                                        tokio::fs::read(&media.path).await?,
535                                    )
536                                    .file_name(file_name)
537                                }
538                            };
539
540                            form = form.part(format!("media_{}", media.track_id), part);
541                        }
542                        Err(e) => {
543                            warn!("Failed to read media file {}: {}", media.path, e);
544                        }
545                    }
546                }
547            }
548            if let Some(dump_events_file) = &record.dump_event_file {
549                if Path::new(&dump_events_file).exists() {
550                    let file_name = Path::new(&dump_events_file)
551                        .file_name()
552                        .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
553                        .to_string_lossy()
554                        .to_string();
555                    match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
556                        .file_name(file_name.clone())
557                        .mime_str("application/octet-stream")
558                    {
559                        Ok(part) => {
560                            form = form.part(format!("dump_events_{}", file_name), part);
561                        }
562                        Err(_) => {}
563                    };
564                }
565            }
566        }
567        let mut request = client.post(url).multipart(form);
568        if let Some(headers_map) = headers {
569            for (key, value) in headers_map {
570                request = request.header(key, value);
571            }
572        }
573        let response = request.send().await?;
574        if response.status().is_success() {
575            let response_text = response.text().await.unwrap_or_default();
576
577            if keep_media_copy.unwrap_or(false) {
578                for media in &record.recorder {
579                    let p = Path::new(&media.path);
580                    if p.exists() {
581                        tokio::fs::remove_file(p).await.ok();
582                    }
583                }
584            }
585            Ok(format!("HTTP upload successful: {}", response_text))
586        } else {
587            Err(anyhow::anyhow!(
588                "HTTP upload failed with status: {} - {}",
589                response.status(),
590                response.text().await.unwrap_or_default()
591            ))
592        }
593    }
594
595    pub async fn save_with_s3_like(
596        formatter: Arc<dyn CallRecordFormatter>,
597        vendor: &S3Vendor,
598        bucket: &String,
599        region: &String,
600        access_key: &String,
601        secret_key: &String,
602        endpoint: &String,
603        with_media: &Option<bool>,
604        keep_media_copy: &Option<bool>,
605        record: &CallRecord,
606    ) -> Result<String> {
607        let start_time = Instant::now();
608        let object_store =
609            build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
610
611        // Serialize call record to JSON
612        let call_log_json = formatter.format(record)?;
613        // Upload call log JSON
614        let filename = formatter.format_file_name(record);
615        let local_files = vec![filename.clone()];
616        let json_path = ObjectPath::from(filename);
617        let buf_size = call_log_json.len();
618        match object_store.put(&json_path, call_log_json.into()).await {
619            Ok(_) => {
620                info!(
621                    elapsed = start_time.elapsed().as_secs_f64(),
622                    %json_path,
623                    buf_size,
624                    "upload call record"
625                );
626            }
627            Err(e) => {
628                warn!(
629                   %json_path,
630                    "failed to upload call record: {}", e
631                );
632            }
633        }
634        // Upload media files if with_media is true
635        if with_media.unwrap_or(false) {
636            let mut media_files = vec![];
637            for media in &record.recorder {
638                if Path::new(&media.path).exists() {
639                    let media_path = ObjectPath::from(formatter.format_media_path(record, media));
640                    media_files.push((media.path.clone(), media_path));
641                }
642            }
643            if let Some(dump_events_file) = &record.dump_event_file {
644                if Path::new(&dump_events_file).exists() {
645                    let dump_events_path =
646                        ObjectPath::from(formatter.format_dump_events_path(record));
647                    media_files.push((dump_events_file.clone(), dump_events_path));
648                }
649            }
650            for (path, media_path) in &media_files {
651                let start_time = Instant::now();
652                let file_content = match tokio::fs::read(path).await {
653                    Ok(file_content) => file_content,
654                    Err(e) => {
655                        warn!("failed to read media file {}: {}", path, e);
656                        continue;
657                    }
658                };
659                let buf_size = file_content.len();
660                match object_store.put(media_path, file_content.into()).await {
661                    Ok(_) => {
662                        info!(
663                            elapsed = start_time.elapsed().as_secs_f64(),
664                            %media_path,
665                            buf_size,
666                            "upload media file"
667                        );
668                    }
669                    Err(e) => {
670                        warn!(%media_path,"failed to upload media file: {}", e);
671                    }
672                }
673            }
674        }
675        // Optionally delete local media files if keep_media_copy is false
676        if !keep_media_copy.unwrap_or(false) {
677            for media in &record.recorder {
678                let p = Path::new(&media.path);
679                if p.exists() {
680                    tokio::fs::remove_file(p).await.ok();
681                }
682            }
683            for file_name in &local_files {
684                let p = Path::new(file_name);
685                if p.exists() {
686                    tokio::fs::remove_file(p).await.ok();
687                }
688            }
689        }
690
691        Ok(format!(
692            "{}/{}",
693            endpoint.trim_end_matches('/'),
694            json_path.to_string().trim_start_matches('/')
695        ))
696    }
697
698    pub async fn serve(&mut self) {
699        let token = self.cancel_token.clone();
700        info!("CallRecordManager serving");
701        tokio::select! {
702            _ = token.cancelled() => {}
703            _ = self.recv_loop() => {}
704        }
705        info!("CallRecordManager served");
706    }
707
708    async fn recv_loop(&mut self) -> Result<()> {
709        let mut futures = FuturesUnordered::new();
710        loop {
711            let limit = self.max_concurrent - futures.len();
712            if limit == 0 {
713                if let Some(_) = futures.next().await {}
714                continue;
715            }
716            let mut buffer = Vec::with_capacity(limit);
717            if self.receiver.recv_many(&mut buffer, limit).await == 0 {
718                break;
719            }
720
721            for record in buffer {
722                let cancel_token_ref = self.cancel_token.clone();
723                let save_fn_ref = self.saver_fn.clone();
724                let config_ref = self.config.clone();
725                let formatter_ref = self.formatter.clone();
726
727                futures.push(async move {
728                    if let Err(e) =
729                        save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
730                    {
731                        warn!("Failed to save call record: {}", e);
732                    }
733                });
734            }
735            while let Some(_) = futures.next().await {}
736        }
737        Ok(())
738    }
739}