tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;

#[derive(Default)]
struct ReplayRunStats {
    frames: u64,
    series_definitions: u64,
    sample_batches: u64,
    points: u64,
}

#[derive(Clone, Copy)]
struct LifecycleReplayContext<'a> {
    wal: Option<&'a crate::engine::wal::FramedWal>,
    observability: &'a StorageObservabilityCounters,
    publication: LifecyclePublicationContext<'a>,
}

impl<'a> LifecycleReplayContext<'a> {
    fn wal(self) -> Option<&'a crate::engine::wal::FramedWal> {
        self.wal
    }

    fn record_started(self) {
        self.observability
            .wal
            .replay_runs_total
            .fetch_add(1, Ordering::Relaxed);
    }

    fn register_series_definitions(
        self,
        definitions: &[crate::engine::wal::SeriesDefinitionFrame],
    ) -> Result<()> {
        let replayed_series_ids =
            self.publication
                .mutate_registry(|registry| -> Result<Vec<SeriesId>> {
                    let mut replayed_series_ids = Vec::with_capacity(definitions.len());
                    for definition in definitions {
                        let resolution = registry.register_series_with_id(
                            definition.series_id,
                            &definition.metric,
                            &definition.labels,
                        )?;
                        if resolution.created {
                            replayed_series_ids.push(definition.series_id);
                        }
                    }
                    Ok(replayed_series_ids)
                })?;
        self.publication
            .registry_bookkeeping
            .mark_series_pending(replayed_series_ids);
        Ok(())
    }

    fn record_committed_series_definitions(
        self,
        definitions: Vec<crate::engine::wal::SeriesDefinitionFrame>,
    ) {
        if let Some(wal) = self.wal {
            wal.record_committed_series_definitions_if_initialized(definitions);
        }
    }

    fn finish(
        self,
        started: Instant,
        stats: ReplayRunStats,
        replay_result: Result<()>,
    ) -> Result<()> {
        self.observability
            .wal
            .replay_duration_nanos_total
            .fetch_add(elapsed_nanos_u64(started), Ordering::Relaxed);

        match replay_result {
            Ok(()) => {
                self.observability
                    .wal
                    .replay_frames_total
                    .fetch_add(stats.frames, Ordering::Relaxed);
                self.observability
                    .wal
                    .replay_series_definitions_total
                    .fetch_add(stats.series_definitions, Ordering::Relaxed);
                self.observability
                    .wal
                    .replay_sample_batches_total
                    .fetch_add(stats.sample_batches, Ordering::Relaxed);
                self.observability
                    .wal
                    .replay_points_total
                    .fetch_add(stats.points, Ordering::Relaxed);
                Ok(())
            }
            Err(err) => {
                self.observability
                    .wal
                    .replay_errors_total
                    .fetch_add(1, Ordering::Relaxed);
                Err(err)
            }
        }
    }
}

impl ChunkStorage {
    fn lifecycle_replay_context(&self) -> LifecycleReplayContext<'_> {
        LifecycleReplayContext {
            wal: self.persisted.wal.as_ref(),
            observability: self.observability.as_ref(),
            publication: self.lifecycle_publication_context(),
        }
    }

    pub(in super::super) fn replay_from_wal(
        &self,
        replay_highwater: WalHighWatermark,
        replay_mode: crate::wal::WalReplayMode,
    ) -> Result<()> {
        let replay = self.lifecycle_replay_context();
        let Some(wal) = replay.wal() else {
            return Ok(());
        };

        wal.set_configured_replay_mode(replay_mode);
        replay.record_started();
        let started = Instant::now();
        let mut replay_stats = ReplayRunStats::default();

        let replay_result = (|| -> Result<()> {
            let mut stream =
                wal.replay_committed_write_stream_after_with_mode(replay_highwater, replay_mode)?;
            while let Some(write) = stream.next_write()? {
                replay_stats.frames = replay_stats.frames.saturating_add(
                    saturating_u64_from_usize(write.series_definitions.len()).saturating_add(1),
                );
                replay_stats.series_definitions = replay_stats
                    .series_definitions
                    .saturating_add(saturating_u64_from_usize(write.series_definitions.len()));
                let replayed_series_definitions = write.series_definitions.clone();
                replay.register_series_definitions(&write.series_definitions)?;
                replay_stats.sample_batches = replay_stats
                    .sample_batches
                    .saturating_add(saturating_u64_from_usize(write.sample_batches.len()));
                replay_stats.points = replay_stats.points.saturating_add(
                    self.replay_wal_sample_batches(write.sample_batches, write.highwater)?,
                );
                replay.record_committed_series_definitions(replayed_series_definitions);
            }

            Ok(())
        })();

        replay.finish(started, replay_stats, replay_result)
    }
}