rama-http 0.3.0-rc1

rama http layers, services and other utilities
use super::Recorder;
use crate::layer::har::spec;
use rama_core::error::{BoxError, ErrorContext};
use rama_core::extensions::{Extension, Extensions};
use rama_core::telemetry::tracing;
use rama_utils::{
    fs::{CreatedFilePermissions, OpenOptions},
    time::now_unix,
};
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;

/// Recorder that can create a file-per-session
/// for actual HAR Recording.
#[derive(Debug, Clone)]
pub struct FileRecorder {
    tx: mpsc::Sender<FileRecorderMessage>,
}

#[derive(Debug, Clone, Extension)]
#[extension(tags(http))]
/// Path to (HAR) file that the [`FileRecorder`] is recording into.
///
/// Inserted into the response extensions.
pub struct HarFilePath(Arc<PathBuf>);

impl AsRef<std::path::Path> for HarFilePath {
    fn as_ref(&self) -> &std::path::Path {
        self.0.as_ref()
    }
}

impl Deref for HarFilePath {
    type Target = std::path::Path;

    fn deref(&self) -> &Self::Target {
        self.0.as_ref()
    }
}

#[derive(Debug)]
enum FileRecorderMessage {
    Record {
        log: Box<spec::Log>,
        ext: oneshot::Sender<Extensions>,
    },
    Stop,
}

#[derive(Debug)]
struct FileRecorderTask {
    rx: mpsc::Receiver<FileRecorderMessage>,

    dir: PathBuf,
    prefix: String,
    start: Instant,
    start_epoch: i64,
}

impl FileRecorderTask {
    fn new(rx: mpsc::Receiver<FileRecorderMessage>, dir: PathBuf, prefix: String) -> Self {
        Self {
            rx,
            dir,
            prefix,
            start: Instant::now(),
            start_epoch: now_unix(),
        }
    }

    async fn run(mut self) {
        #[derive(Debug)]
        struct Storage {
            file: File,
            path: PathBuf,
            has_entries: bool,
        }

        impl Storage {
            async fn try_new(path: PathBuf) -> Result<Self, BoxError> {
                if let Some(parent) = path.parent() {
                    create_har_parent_dir(parent)
                        .await
                        .context("create HAR file parent dir")?;
                }
                // HAR archives can record `Authorization`, `Cookie`, request bodies, and
                // other secrets. On Unix we apply `0o600` at creation so the bytes never
                // exist on disk world-/group-readable. On Windows the file inherits the
                // parent dir's ACL — locking that down is out of scope here.
                let file = OpenOptions::new()
                    .write(true)
                    .create(true)
                    .truncate(true)
                    .created_file_permissions(CreatedFilePermissions::OwnerReadWrite)
                    .open(&path)
                    .await
                    .context("create HAR file")?;
                Ok(Self {
                    file,
                    path,
                    has_entries: false,
                })
            }
        }

        /// Create the HAR file's parent directory. On Unix, freshly-created
        /// components are mode `0o700` so an attacker who can list the parent
        /// can't enumerate the recorded archive. Pre-existing components are
        /// left untouched (mirrors `fs::create_dir_all` semantics).
        async fn create_har_parent_dir(parent: &std::path::Path) -> std::io::Result<()> {
            #[cfg(unix)]
            {
                use std::os::unix::fs::DirBuilderExt as _;
                let mut b = std::fs::DirBuilder::new();
                b.recursive(true).mode(0o700);
                let parent = parent.to_owned();
                tokio::task::spawn_blocking(move || b.create(&parent))
                    .await
                    .map_err(std::io::Error::other)??;
                Ok(())
            }
            #[cfg(not(unix))]
            {
                tokio::fs::create_dir_all(parent).await
            }
        }

        let mut storage: Option<Storage> = None;
        let mut counter = 0;
        let mut buf = Vec::new();

        'msg_loop: while let Some(msg) = self.rx.recv().await {
            match msg {
                FileRecorderMessage::Record { log, ext } => {
                    let storage_ref = if let Some(sr) = storage.as_mut() {
                        sr
                    } else {
                        storage = Some(
                            match async {
                                let file_name = format!(
                                    "{}_{}_{}_{}.har",
                                    self.prefix,
                                    self.start_epoch,
                                    {
                                        let i = counter;
                                        counter += 1;
                                        i
                                    },
                                    self.start.elapsed().as_secs()
                                );
                                create_har_parent_dir(&self.dir)
                                    .await
                                    .context("create HAR recording dir")?;
                                let path = rama_utils::fs::safe_path_in(&self.dir, file_name)
                                    .await
                                    .context("validate HAR file path")?;
                                Storage::try_new(path).await
                            }
                            .await
                            {
                                Err(err) => {
                                    tracing::debug!(
                                        "failed to create file for HAR recording: {err} (ignore log entry)"
                                    );
                                    continue 'msg_loop;
                                }
                                Ok(storage) => storage,
                            },
                        );
                        #[expect(
                            clippy::expect_used,
                            reason = "it was assigned in the previous assignment as Some"
                        )]
                        // NOTE: not the cleanest design in this file, but ok for now
                        let storage_ref = storage
                            .as_mut()
                            .expect("storage to be some due to previous statement");

                        buf.clear();
                        let header = serde_json::json!({
                            "log": {
                                "version": log.version,
                                "creator": log.creator,
                                "browser": log.browser,
                                "comment": log.comment,
                                "pages": [], // pages is required, even if we do not support it
                            },
                        });
                        if let Err(err) = serde_json::to_writer(&mut buf, &header) {
                            tracing::debug!(
                                "failed to serialize initial json content for HAR log: {err} (drop file)"
                            );
                            storage = None;
                            continue 'msg_loop;
                        }
                        buf.truncate(buf.len() - 2); // '}}'
                        _ = write!(buf, ",\"entries\":["); // cannot fail (unless something like OOM)
                        if let Err(err) = storage_ref.file.write_all(&buf).await {
                            tracing::debug!(
                                "failed to write initial json content for HAR log: {err} (drop file)"
                            );
                            storage = None;
                            continue 'msg_loop;
                        }

                        storage_ref
                    };

                    if log.pages.map(|p| !p.is_empty()).unwrap_or_default() {
                        tracing::debug!(
                            "log contains pages which are not supported by the har recorder!"
                        );
                    }

                    for entry in log.entries.iter() {
                        tracing::trace!("har log file writer: write entry: {entry:?}");
                        buf.clear();
                        match serde_json::to_writer(&mut buf, entry) {
                            Ok(_) => {
                                if storage_ref.has_entries
                                    && let Err(err) = storage_ref.file.write_u8(b',').await
                                {
                                    tracing::debug!("failed to write entry separator: {err}");
                                    #[expect(clippy::expect_used)]
                                    finish_file(
                                        storage
                                            .take()
                                            .expect("storage to exist as we have reference to it")
                                            .file,
                                    )
                                    .await;
                                    continue 'msg_loop;
                                } else if let Err(err) = storage_ref.file.write_all(&buf).await {
                                    tracing::debug!("failed to write serialized entry: {err}");
                                    #[expect(clippy::expect_used)]
                                    finish_file(
                                        storage
                                            .take()
                                            .expect("storage to exist as we have reference to it")
                                            .file,
                                    )
                                    .await;
                                    continue 'msg_loop;
                                } else {
                                    storage_ref.has_entries = true;
                                }
                            }
                            Err(err) => {
                                tracing::debug!(
                                    "failed entry ({entry:?}) due to json serialize error: {err}"
                                );
                                #[expect(clippy::expect_used)]
                                finish_file(
                                    storage
                                        .take()
                                        .expect("storage to exist as we have reference to it")
                                        .file,
                                )
                                .await;
                                continue 'msg_loop;
                            }
                        }
                    }

                    let extensions = Extensions::new();
                    extensions.insert(HarFilePath(storage_ref.path.clone().into()));
                    if ext.send(extensions).is_err() {
                        tracing::debug!(
                            "failed to send http extensions w/ har file path back to recorder callee"
                        );
                    }
                }
                FileRecorderMessage::Stop => {
                    if let Some(storage) = storage.take() {
                        tracing::trace!(
                            "FileRecorderMessage::Stop recieved: finish file {:?}",
                            storage.path
                        );
                        finish_file(storage.file).await;
                    } else {
                        tracing::debug!(
                            "FileRecorderMessage::Stop received while no session active: ignore"
                        );
                    }
                }
            }
        }
        if let Some(storage) = storage {
            tracing::trace!(
                "FileRecorder task exiting: file '{:?}' was still active: finish file",
                storage.path
            );
            finish_file(storage.file).await;
        }
    }
}

async fn finish_file(mut file: File) {
    // ] entries > } log > } root
    if let Err(err) = file.write_all(b"]}}").await {
        tracing::debug!("failed to write trailing characters for finished har file: {err}");
    }
}

impl Default for FileRecorder {
    fn default() -> Self {
        Self::new(
            std::env::temp_dir().join("rama").join("har_recordings"),
            format!(
                "rama_{}_recording",
                rama_utils::info::VERSION.replace('.', "_")
            ),
        )
    }
}

impl FileRecorder {
    /// Create a new [`FileRecorder`] for the given dir and prefix.
    ///
    /// Use [`FileRecorder::default`] if you wish to use a temporary
    /// directory for it using the default rama-version based prefix.
    #[must_use]
    pub fn new(dir: PathBuf, prefix: String) -> Self {
        let (tx, rx) = mpsc::channel(match std::thread::available_parallelism() {
            Ok(n) => n.get(),
            Err(_) => 1,
        });

        let task = FileRecorderTask::new(rx, dir, prefix);
        tokio::spawn(task.run());

        Self { tx }
    }
}

impl Recorder for FileRecorder {
    async fn record(&self, log: spec::Log) -> Option<Extensions> {
        let (tx, rx) = oneshot::channel();
        if let Err(err) = self
            .tx
            .send(FileRecorderMessage::Record {
                log: Box::new(log),
                ext: tx,
            })
            .await
        {
            tracing::debug!("FileRecorder: failed to send log for recording to task: {err}");
        }
        rx.await
            .inspect_err(|err| {
                tracing::debug!("file recorder: record oneshot reply await error: {err}");
            })
            .ok()
    }

    async fn stop_record(&self) {
        if let Err(err) = self.tx.send(FileRecorderMessage::Stop).await {
            tracing::debug!("FileRecorder: failed to send stop record msg to task: {err}");
        }
    }
}