focl 0.1.0

focl/focld - lightweight Rust BGP speaker
Documentation
use std::fs::{self, File};
use std::io::{BufWriter, Write};

use anyhow::{Context, Result};
use bzip2::write::BzEncoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use zstd::stream::write::Encoder as ZstdEncoder;

use crate::archive::manifest::SegmentManifest;
use crate::archive::types::{ArchiveStream, FinalizedSegment, SegmentPaths};
use crate::config::{ArchiveConfig, CompressionKind};

enum SegmentEncoder {
    Gzip(GzEncoder<BufWriter<File>>),
    Bzip2(BzEncoder<BufWriter<File>>),
    Zstd(ZstdEncoder<'static, BufWriter<File>>),
}

impl SegmentEncoder {
    fn write_all(&mut self, buf: &[u8]) -> Result<()> {
        match self {
            SegmentEncoder::Gzip(writer) => writer.write_all(buf)?,
            SegmentEncoder::Bzip2(writer) => writer.write_all(buf)?,
            SegmentEncoder::Zstd(writer) => writer.write_all(buf)?,
        }
        Ok(())
    }

    fn flush(&mut self) -> Result<()> {
        match self {
            SegmentEncoder::Gzip(writer) => writer.flush()?,
            SegmentEncoder::Bzip2(writer) => writer.flush()?,
            SegmentEncoder::Zstd(writer) => writer.flush()?,
        }
        Ok(())
    }

    fn finish(mut self) -> Result<File> {
        self.flush()?;
        let file = match self {
            SegmentEncoder::Gzip(writer) => writer
                .finish()
                .context("failed to finish gzip stream")?
                .into_inner()
                .map_err(|e| anyhow::anyhow!(e.to_string()))?,
            SegmentEncoder::Bzip2(writer) => writer
                .finish()
                .context("failed to finish bzip2 stream")?
                .into_inner()
                .map_err(|e| anyhow::anyhow!(e.to_string()))?,
            SegmentEncoder::Zstd(writer) => writer
                .finish()
                .context("failed to finish zstd stream")?
                .into_inner()
                .map_err(|e| anyhow::anyhow!(e.to_string()))?,
        };
        Ok(file)
    }
}

pub struct SegmentWriter {
    cfg: ArchiveConfig,
    stream: ArchiveStream,
    start_ts: i64,
    paths: SegmentPaths,
    encoder: SegmentEncoder,
    record_count: u64,
}

impl SegmentWriter {
    pub fn new(
        cfg: &ArchiveConfig,
        stream: ArchiveStream,
        start_ts: i64,
        paths: SegmentPaths,
    ) -> Result<Self> {
        if let Some(parent) = paths.tmp_path.parent() {
            fs::create_dir_all(parent)
                .with_context(|| format!("failed to create tmp directory {}", parent.display()))?;
        }
        if let Some(parent) = paths.final_path.parent() {
            fs::create_dir_all(parent).with_context(|| {
                format!("failed to create final directory {}", parent.display())
            })?;
        }

        let file = File::create(&paths.tmp_path).with_context(|| {
            format!("failed to create tmp segment {}", paths.tmp_path.display())
        })?;
        let buffered = BufWriter::new(file);

        let encoder = match cfg.compression {
            CompressionKind::Gzip => {
                SegmentEncoder::Gzip(GzEncoder::new(buffered, Compression::default()))
            }
            CompressionKind::Bzip2 => {
                SegmentEncoder::Bzip2(BzEncoder::new(buffered, bzip2::Compression::default()))
            }
            CompressionKind::Zstd => {
                let enc = ZstdEncoder::new(buffered, 3).context("failed to create zstd encoder")?;
                SegmentEncoder::Zstd(enc)
            }
        };

        Ok(Self {
            cfg: cfg.clone(),
            stream,
            start_ts,
            paths,
            encoder,
            record_count: 0,
        })
    }

    pub fn write_record(&mut self, record: &[u8]) -> Result<()> {
        self.encoder.write_all(record)?;
        self.record_count += 1;
        Ok(())
    }

    pub fn path(&self) -> &std::path::Path {
        &self.paths.final_path
    }

    pub fn record_count(&self) -> u64 {
        self.record_count
    }

    pub fn start_ts(&self) -> i64 {
        self.start_ts
    }

    pub fn finalize(self, end_ts: i64) -> Result<FinalizedSegment> {
        let file = self.encoder.finish()?;
        if self.cfg.fsync_on_rotate {
            file.sync_all().context("failed to fsync archive segment")?;
        }
        drop(file);

        fs::rename(&self.paths.tmp_path, &self.paths.final_path).with_context(|| {
            format!(
                "failed to atomically move {} to {}",
                self.paths.tmp_path.display(),
                self.paths.final_path.display()
            )
        })?;

        let manifest = SegmentManifest::build(
            self.cfg.collector_id.clone(),
            self.stream,
            self.start_ts,
            end_ts,
            self.record_count,
            self.cfg.compression,
            self.cfg.layout_profile,
            &self.paths.final_path,
            &self.paths.relative_path,
        )?;

        let manifest_path = manifest.write_sidecar(&self.paths.final_path)?;

        Ok(FinalizedSegment {
            stream: self.stream,
            start_ts: self.start_ts,
            end_ts,
            record_count: self.record_count,
            bytes: manifest.bytes,
            compression: self.cfg.compression,
            final_path: self.paths.final_path,
            relative_path: self.paths.relative_path,
            manifest_path,
        })
    }
}