active_call/callrecord/
mod.rs

1use crate::CallOption;
2use crate::{
3    call::ActiveCallType,
4    config::{CallRecordConfig, S3Vendor},
5};
6use anyhow::Result;
7use chrono::{DateTime, Local, Utc};
8use futures::stream::{FuturesUnordered, StreamExt};
9use object_store::PutPayload;
10use object_store::{
11    ObjectStore, ObjectStoreExt, aws::AmazonS3Builder, azure::MicrosoftAzureBuilder,
12    gcp::GoogleCloudStorageBuilder, path::Path as ObjectPath,
13};
14use reqwest;
15use serde::{Deserialize, Serialize};
16use serde_with::skip_serializing_none;
17use std::{
18    collections::HashMap, future::Future, path::Path, pin::Pin, str::FromStr, sync::Arc,
19    time::Instant,
20};
21use tokio::{fs::File, io::AsyncWriteExt};
22use tokio_util::sync::CancellationToken;
23use tracing::{info, warn};
24
25pub type CallRecordSender = tokio::sync::mpsc::UnboundedSender<CallRecord>;
26pub type CallRecordReceiver = tokio::sync::mpsc::UnboundedReceiver<CallRecord>;
27
28pub type FnSaveCallRecord = Arc<
29    Box<
30        dyn Fn(
31                CancellationToken,
32                Arc<dyn CallRecordFormatter>,
33                Arc<CallRecordConfig>,
34                CallRecord,
35            ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>
36            + Send
37            + Sync,
38    >,
39>;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub enum CallRecordEventType {
44    Event,
45    Command,
46    Sip,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(rename_all = "camelCase")]
51pub struct CallRecordEvent {
52    pub r#type: CallRecordEventType,
53    pub timestamp: u64,
54    pub content: String,
55}
56
57impl CallRecordEvent {
58    pub async fn write<T: Serialize>(r#type: CallRecordEventType, obj: T, file: &mut File) {
59        let content = match serde_json::to_string(&obj) {
60            Ok(s) => s,
61            Err(_) => return,
62        };
63        let event = Self {
64            r#type,
65            timestamp: crate::media::get_timestamp(),
66            content,
67        };
68        match serde_json::to_string(&event) {
69            Ok(line) => {
70                file.write_all(format!("{}\n", line).as_bytes()).await.ok();
71            }
72            Err(_) => {}
73        }
74    }
75}
76
77#[skip_serializing_none]
78#[derive(Debug, Clone, Serialize, Deserialize, Default)]
79#[serde(rename_all = "camelCase")]
80pub struct CallRecord {
81    pub call_type: ActiveCallType,
82    pub option: Option<CallOption>,
83    pub call_id: String,
84    pub start_time: DateTime<Utc>,
85    pub ring_time: Option<DateTime<Utc>>,
86    pub answer_time: Option<DateTime<Utc>>,
87    pub end_time: DateTime<Utc>,
88    pub caller: String,
89    pub callee: String,
90    pub status_code: u16,
91    pub hangup_reason: Option<CallRecordHangupReason>,
92    #[serde(skip_serializing_if = "Vec::is_empty")]
93    #[serde(default)]
94    pub hangup_messages: Vec<CallRecordHangupMessage>,
95    #[serde(skip_serializing_if = "Vec::is_empty")]
96    #[serde(default)]
97    pub recorder: Vec<CallRecordMedia>,
98    pub extras: Option<HashMap<String, serde_json::Value>>,
99    pub dump_event_file: Option<String>,
100    pub refer_callrecord: Option<Box<CallRecord>>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(rename_all = "camelCase")]
105pub struct CallRecordMedia {
106    pub track_id: String,
107    pub path: String,
108    pub size: u64,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub extra: Option<HashMap<String, serde_json::Value>>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114#[serde(rename_all = "camelCase")]
115pub enum CallRecordHangupReason {
116    ByCaller,
117    ByCallee,
118    ByRefer,
119    BySystem,
120    Autohangup,
121    InactivityTimeout,
122    NoAnswer,
123    NoBalance,
124    AnswerMachine,
125    ServerUnavailable,
126    Canceled,
127    Rejected,
128    Failed,
129    Other(String),
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "camelCase")]
134pub struct CallRecordHangupMessage {
135    pub code: u16,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub reason: Option<String>,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub target: Option<String>,
140}
141
142impl FromStr for CallRecordHangupReason {
143    type Err = anyhow::Error;
144
145    fn from_str(s: &str) -> Result<Self, Self::Err> {
146        match s.to_lowercase().as_str() {
147            "caller" => Ok(Self::ByCaller),
148            "callee" => Ok(Self::ByCallee),
149            "refer" => Ok(Self::ByRefer),
150            "system" => Ok(Self::BySystem),
151            "autohangup" => Ok(Self::Autohangup),
152            "inactivitytimeout" => Ok(Self::InactivityTimeout),
153            "noAnswer" => Ok(Self::NoAnswer),
154            "noBalance" => Ok(Self::NoBalance),
155            "answerMachine" => Ok(Self::AnswerMachine),
156            "serverUnavailable" => Ok(Self::ServerUnavailable),
157            "canceled" => Ok(Self::Canceled),
158            "rejected" => Ok(Self::Rejected),
159            "failed" => Ok(Self::Failed),
160            _ => Ok(Self::Other(s.to_string())),
161        }
162    }
163}
164impl ToString for CallRecordHangupReason {
165    fn to_string(&self) -> String {
166        match self {
167            Self::ByCaller => "caller".to_string(),
168            Self::ByCallee => "callee".to_string(),
169            Self::ByRefer => "refer".to_string(),
170            Self::BySystem => "system".to_string(),
171            Self::Autohangup => "autohangup".to_string(),
172            Self::InactivityTimeout => "inactivityTimeout".to_string(),
173            Self::NoAnswer => "noAnswer".to_string(),
174            Self::NoBalance => "noBalance".to_string(),
175            Self::AnswerMachine => "answerMachine".to_string(),
176            Self::ServerUnavailable => "serverUnavailable".to_string(),
177            Self::Canceled => "canceled".to_string(),
178            Self::Rejected => "rejected".to_string(),
179            Self::Failed => "failed".to_string(),
180            Self::Other(s) => s.to_string(),
181        }
182    }
183}
184
185pub fn default_cdr_file_name(record: &CallRecord) -> String {
186    format!(
187        "{}_{}.json",
188        record
189            .start_time
190            .with_timezone(&Local)
191            .format("%Y%m%d-%H%M%S"),
192        record.call_id
193    )
194}
195
196pub trait CallRecordFormatter: Send + Sync {
197    fn format(&self, record: &CallRecord) -> Result<String> {
198        Ok(serde_json::to_string(record)?)
199    }
200    fn format_file_name(&self, record: &CallRecord) -> String;
201    fn format_dump_events_path(&self, record: &CallRecord) -> String;
202    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String;
203}
204
205pub struct DefaultCallRecordFormatter {
206    pub root: String,
207}
208
209impl Default for DefaultCallRecordFormatter {
210    fn default() -> Self {
211        Self {
212            root: "./config/cdr".to_string(),
213        }
214    }
215}
216
217impl DefaultCallRecordFormatter {
218    pub fn new_with_config(config: &CallRecordConfig) -> Self {
219        let root = match config {
220            CallRecordConfig::Local { root } => root.clone(),
221            CallRecordConfig::S3 { root, .. } => root.clone(),
222            _ => "./config/cdr".to_string(),
223        };
224        Self { root }
225    }
226}
227
228impl CallRecordFormatter for DefaultCallRecordFormatter {
229    fn format_file_name(&self, record: &CallRecord) -> String {
230        let trimmed_root = self.root.trim_end_matches('/');
231        let file_name = default_cdr_file_name(record);
232        if trimmed_root.is_empty() {
233            file_name
234        } else {
235            format!(
236                "{}/{}/{}",
237                trimmed_root,
238                record.start_time.with_timezone(&Local).format("%Y%m%d"),
239                file_name
240            )
241        }
242    }
243
244    fn format_dump_events_path(&self, record: &CallRecord) -> String {
245        format!(
246            "{}/{}/{}.jsonl",
247            self.root.trim_end_matches('/'),
248            record.start_time.with_timezone(&Local).format("%Y%m%d"),
249            record.call_id
250        )
251    }
252
253    fn format_media_path(&self, record: &CallRecord, media: &CallRecordMedia) -> String {
254        let file_name = Path::new(&media.path)
255            .file_name()
256            .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
257            .to_string_lossy()
258            .to_string();
259
260        format!(
261            "{}/{}/{}_{}_{}",
262            self.root.trim_end_matches('/'),
263            record.start_time.with_timezone(&Local).format("%Y%m%d"),
264            record.call_id,
265            media.track_id,
266            file_name
267        )
268    }
269}
270
271pub fn build_object_store_from_s3(
272    vendor: &S3Vendor,
273    bucket: &str,
274    region: &str,
275    access_key: &str,
276    secret_key: &str,
277    endpoint: &str,
278) -> Result<Arc<dyn ObjectStore>> {
279    let store: Arc<dyn ObjectStore> = match vendor {
280        S3Vendor::AWS => {
281            let builder = AmazonS3Builder::new()
282                .with_bucket_name(bucket)
283                .with_region(region)
284                .with_access_key_id(access_key)
285                .with_secret_access_key(secret_key);
286
287            let instance = if !endpoint.is_empty() {
288                builder.with_endpoint(endpoint).build()?
289            } else {
290                builder.build()?
291            };
292            Arc::new(instance)
293        }
294        S3Vendor::GCP => {
295            let instance = GoogleCloudStorageBuilder::new()
296                .with_bucket_name(bucket)
297                .with_service_account_key(secret_key)
298                .build()?;
299            Arc::new(instance)
300        }
301        S3Vendor::Azure => {
302            let instance = MicrosoftAzureBuilder::new()
303                .with_container_name(bucket)
304                .with_account(access_key)
305                .with_access_key(secret_key)
306                .build()?;
307            Arc::new(instance)
308        }
309        S3Vendor::Aliyun | S3Vendor::Tencent | S3Vendor::Minio | S3Vendor::DigitalOcean => {
310            let instance = AmazonS3Builder::new()
311                .with_bucket_name(bucket)
312                .with_region(region)
313                .with_access_key_id(access_key)
314                .with_secret_access_key(secret_key)
315                .with_endpoint(endpoint)
316                .with_virtual_hosted_style_request(false)
317                .build()?;
318            Arc::new(instance)
319        }
320    };
321
322    Ok(store)
323}
324
325pub struct CallRecordManager {
326    pub max_concurrent: usize,
327    pub sender: CallRecordSender,
328    config: Arc<CallRecordConfig>,
329    cancel_token: CancellationToken,
330    receiver: CallRecordReceiver,
331    saver_fn: FnSaveCallRecord,
332    formatter: Arc<dyn CallRecordFormatter>,
333}
334
335pub struct CallRecordManagerBuilder {
336    pub cancel_token: Option<CancellationToken>,
337    pub config: Option<CallRecordConfig>,
338    pub max_concurrent: Option<usize>,
339    saver_fn: Option<FnSaveCallRecord>,
340    formatter: Option<Arc<dyn CallRecordFormatter>>,
341}
342
343impl CallRecordManagerBuilder {
344    pub fn new() -> Self {
345        Self {
346            cancel_token: None,
347            config: None,
348            max_concurrent: None,
349            saver_fn: None,
350            formatter: None,
351        }
352    }
353
354    pub fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
355        self.cancel_token = Some(cancel_token);
356        self
357    }
358
359    pub fn with_config(mut self, config: CallRecordConfig) -> Self {
360        self.config = Some(config);
361        self
362    }
363
364    pub fn with_saver(mut self, saver: FnSaveCallRecord) -> Self {
365        self.saver_fn = Some(saver);
366        self
367    }
368
369    pub fn with_formatter(mut self, formatter: Arc<dyn CallRecordFormatter>) -> Self {
370        self.formatter = Some(formatter);
371        self
372    }
373
374    pub fn with_max_concurrent(mut self, max_concurrent: usize) -> Self {
375        self.max_concurrent = Some(max_concurrent);
376        self
377    }
378
379    pub fn build(self) -> CallRecordManager {
380        let cancel_token = self.cancel_token.unwrap_or_default();
381        let config = Arc::new(self.config.unwrap_or_default());
382        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
383        let saver_fn = self
384            .saver_fn
385            .unwrap_or_else(|| Arc::new(Box::new(CallRecordManager::default_saver)));
386        let formatter = self
387            .formatter
388            .unwrap_or_else(|| Arc::new(DefaultCallRecordFormatter::default()));
389        let max_concurrent = self.max_concurrent.unwrap_or(64);
390
391        match config.as_ref() {
392            CallRecordConfig::Local { root } => {
393                if !Path::new(&root).exists() {
394                    match std::fs::create_dir_all(&root) {
395                        Ok(_) => {
396                            info!("CallRecordManager created directory: {}", root);
397                        }
398                        Err(e) => {
399                            warn!("CallRecordManager failed to create directory: {}", e);
400                        }
401                    }
402                }
403            }
404            _ => {}
405        }
406
407        CallRecordManager {
408            max_concurrent,
409            cancel_token,
410            sender,
411            receiver,
412            config,
413            saver_fn,
414            formatter,
415        }
416    }
417}
418
419impl CallRecordManager {
420    fn default_saver(
421        _cancel_token: CancellationToken,
422        formatter: Arc<dyn CallRecordFormatter>,
423        config: Arc<CallRecordConfig>,
424        record: CallRecord,
425    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
426        Box::pin(async move {
427            let mut record = record;
428            let start_time = Instant::now();
429            let result = match config.as_ref() {
430                CallRecordConfig::Local { .. } => {
431                    Self::save_local_record(formatter.clone(), &mut record).await
432                }
433                CallRecordConfig::S3 {
434                    vendor,
435                    bucket,
436                    region,
437                    access_key,
438                    secret_key,
439                    endpoint,
440                    with_media,
441                    keep_media_copy,
442                    ..
443                } => {
444                    Self::save_with_s3_like(
445                        formatter.clone(),
446                        vendor,
447                        bucket,
448                        region,
449                        access_key,
450                        secret_key,
451                        endpoint,
452                        with_media,
453                        keep_media_copy,
454                        &record,
455                    )
456                    .await
457                }
458                CallRecordConfig::Http {
459                    url,
460                    headers,
461                    with_media,
462                    keep_media_copy,
463                } => {
464                    Self::save_with_http(
465                        formatter.clone(),
466                        url,
467                        headers,
468                        with_media,
469                        keep_media_copy,
470                        &record,
471                    )
472                    .await
473                }
474            };
475            let file_name = match result {
476                Ok(file_name) => file_name,
477                Err(e) => {
478                    warn!("Failed to save call record: {}", e);
479                    return Err(e);
480                }
481            };
482            let elapsed = start_time.elapsed();
483            info!(
484                ?elapsed,
485                call_id = record.call_id,
486                file_name,
487                "CallRecordManager saved"
488            );
489            Ok(())
490        })
491    }
492
493    async fn save_local_record(
494        formatter: Arc<dyn CallRecordFormatter>,
495        record: &mut CallRecord,
496    ) -> Result<String> {
497        let file_content = formatter.format(record)?;
498        let file_name = formatter.format_file_name(record);
499
500        // Ensure parent directory exists
501        if let Some(parent) = Path::new(&file_name).parent() {
502            tokio::fs::create_dir_all(parent).await.map_err(|e| {
503                anyhow::anyhow!("Failed to create parent directory for {}: {}", file_name, e)
504            })?;
505        }
506
507        let mut file = File::create(&file_name).await.map_err(|e| {
508            anyhow::anyhow!("Failed to create call record file {}: {}", file_name, e)
509        })?;
510        file.write_all(file_content.as_bytes()).await?;
511        file.flush().await?;
512        Ok(file_name.to_string())
513    }
514
515    pub async fn save_with_http(
516        formatter: Arc<dyn CallRecordFormatter>,
517        url: &String,
518        headers: &Option<HashMap<String, String>>,
519        with_media: &Option<bool>,
520        keep_media_copy: &Option<bool>,
521        record: &CallRecord,
522    ) -> Result<String> {
523        let client = reqwest::Client::new();
524        // Serialize call record to JSON
525        let call_log_json = formatter.format(record)?;
526        // Create multipart form
527        let mut form = reqwest::multipart::Form::new().text("calllog.json", call_log_json);
528
529        // Add media files if with_media is true
530        if with_media.unwrap_or(false) {
531            for media in &record.recorder {
532                if std::path::Path::new(&media.path).exists() {
533                    match tokio::fs::read(&media.path).await {
534                        Ok(file_content) => {
535                            let file_name = std::path::Path::new(&media.path)
536                                .file_name()
537                                .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
538                                .to_string_lossy()
539                                .to_string();
540
541                            let part = match reqwest::multipart::Part::bytes(file_content)
542                                .file_name(file_name.clone())
543                                .mime_str("application/octet-stream")
544                            {
545                                Ok(part) => part,
546                                Err(_) => {
547                                    // Fallback to default MIME type if parsing fails
548                                    reqwest::multipart::Part::bytes(
549                                        tokio::fs::read(&media.path).await?,
550                                    )
551                                    .file_name(file_name)
552                                }
553                            };
554
555                            form = form.part(format!("media_{}", media.track_id), part);
556                        }
557                        Err(e) => {
558                            warn!("Failed to read media file {}: {}", media.path, e);
559                        }
560                    }
561                }
562            }
563            if let Some(dump_events_file) = &record.dump_event_file {
564                if Path::new(&dump_events_file).exists() {
565                    let file_name = Path::new(&dump_events_file)
566                        .file_name()
567                        .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
568                        .to_string_lossy()
569                        .to_string();
570                    match reqwest::multipart::Part::bytes(tokio::fs::read(&dump_events_file).await?)
571                        .file_name(file_name.clone())
572                        .mime_str("application/octet-stream")
573                    {
574                        Ok(part) => {
575                            form = form.part(format!("dump_events_{}", file_name), part);
576                        }
577                        Err(_) => {}
578                    };
579                }
580            }
581        }
582        let mut request = client.post(url).multipart(form);
583        if let Some(headers_map) = headers {
584            for (key, value) in headers_map {
585                request = request.header(key, value);
586            }
587        }
588        let response = request.send().await?;
589        if response.status().is_success() {
590            let response_text = response.text().await.unwrap_or_default();
591
592            if keep_media_copy.unwrap_or(false) {
593                for media in &record.recorder {
594                    let p = Path::new(&media.path);
595                    if p.exists() {
596                        tokio::fs::remove_file(p).await.ok();
597                    }
598                }
599            }
600            Ok(format!("HTTP upload successful: {}", response_text))
601        } else {
602            Err(anyhow::anyhow!(
603                "HTTP upload failed with status: {} - {}",
604                response.status(),
605                response.text().await.unwrap_or_default()
606            ))
607        }
608    }
609
610    pub async fn save_with_s3_like(
611        formatter: Arc<dyn CallRecordFormatter>,
612        vendor: &S3Vendor,
613        bucket: &String,
614        region: &String,
615        access_key: &String,
616        secret_key: &String,
617        endpoint: &String,
618        with_media: &Option<bool>,
619        keep_media_copy: &Option<bool>,
620        record: &CallRecord,
621    ) -> Result<String> {
622        let start_time = Instant::now();
623        let object_store =
624            build_object_store_from_s3(vendor, bucket, region, access_key, secret_key, endpoint)?;
625
626        // Serialize call record to JSON
627        let call_log_json = formatter.format(record)?;
628        // Upload call log JSON
629        let filename = formatter.format_file_name(record);
630        let local_files = vec![filename.clone()];
631        let json_path = ObjectPath::from(filename);
632        let buf_size = call_log_json.len();
633        match object_store
634            .put(&json_path, PutPayload::from(call_log_json))
635            .await
636        {
637            Ok(_) => {
638                info!(
639                    elapsed = start_time.elapsed().as_secs_f64(),
640                    %json_path,
641                    buf_size,
642                    "upload call record"
643                );
644            }
645            Err(e) => {
646                warn!(
647                   %json_path,
648                    "failed to upload call record: {}", e
649                );
650            }
651        }
652        // Upload media files if with_media is true
653        if with_media.unwrap_or(false) {
654            let mut media_files = vec![];
655            for media in &record.recorder {
656                if Path::new(&media.path).exists() {
657                    let media_path = ObjectPath::from(formatter.format_media_path(record, media));
658                    media_files.push((media.path.clone(), media_path));
659                }
660            }
661            if let Some(dump_events_file) = &record.dump_event_file {
662                if Path::new(&dump_events_file).exists() {
663                    let dump_events_path =
664                        ObjectPath::from(formatter.format_dump_events_path(record));
665                    media_files.push((dump_events_file.clone(), dump_events_path));
666                }
667            }
668            for (path, media_path) in &media_files {
669                let start_time = Instant::now();
670                let file_content = match tokio::fs::read(path).await {
671                    Ok(file_content) => file_content,
672                    Err(e) => {
673                        warn!("failed to read media file {}: {}", path, e);
674                        continue;
675                    }
676                };
677                let buf_size = file_content.len();
678                match object_store
679                    .put(media_path, PutPayload::from(file_content))
680                    .await
681                {
682                    Ok(_) => {
683                        info!(
684                            elapsed = start_time.elapsed().as_secs_f64(),
685                            %media_path,
686                            buf_size,
687                            "upload media file"
688                        );
689                    }
690                    Err(e) => {
691                        warn!(%media_path,"failed to upload media file: {}", e);
692                    }
693                }
694            }
695        }
696        // Optionally delete local media files if keep_media_copy is false
697        if !keep_media_copy.unwrap_or(false) {
698            for media in &record.recorder {
699                let p = Path::new(&media.path);
700                if p.exists() {
701                    tokio::fs::remove_file(p).await.ok();
702                }
703            }
704            for file_name in &local_files {
705                let p = Path::new(file_name);
706                if p.exists() {
707                    tokio::fs::remove_file(p).await.ok();
708                }
709            }
710        }
711
712        Ok(format!(
713            "{}/{}",
714            endpoint.trim_end_matches('/'),
715            json_path.to_string().trim_start_matches('/')
716        ))
717    }
718
719    pub async fn serve(&mut self) {
720        let token = self.cancel_token.clone();
721        info!("CallRecordManager serving");
722        tokio::select! {
723            _ = token.cancelled() => {}
724            _ = self.recv_loop() => {}
725        }
726        info!("CallRecordManager served");
727    }
728
729    async fn recv_loop(&mut self) -> Result<()> {
730        let mut futures = FuturesUnordered::new();
731        loop {
732            let limit = self.max_concurrent - futures.len();
733            if limit == 0 {
734                if let Some(_) = futures.next().await {}
735                continue;
736            }
737            let mut buffer = Vec::with_capacity(limit);
738            if self.receiver.recv_many(&mut buffer, limit).await == 0 {
739                break;
740            }
741
742            for record in buffer {
743                let cancel_token_ref = self.cancel_token.clone();
744                let save_fn_ref = self.saver_fn.clone();
745                let config_ref = self.config.clone();
746                let formatter_ref = self.formatter.clone();
747
748                futures.push(async move {
749                    if let Err(e) =
750                        save_fn_ref(cancel_token_ref, formatter_ref, config_ref, record).await
751                    {
752                        warn!("Failed to save call record: {}", e);
753                    }
754                });
755            }
756            while let Some(_) = futures.next().await {}
757        }
758        Ok(())
759    }
760}