sof 0.18.1

Solana Observer Framework for low-latency shred ingestion and plugin-driven transaction observation
Documentation
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use super::stream::SlotStream;

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SharedPayloadFragment {
    bytes: Arc<[u8]>,
    offset: u32,
    len: u32,
}

impl SharedPayloadFragment {
    #[must_use]
    pub fn borrowed(bytes: Arc<[u8]>, offset: usize, len: usize) -> Option<Self> {
        let end = offset.checked_add(len)?;
        if end > bytes.len() {
            return None;
        }
        Some(Self {
            bytes,
            offset: u32::try_from(offset).ok()?,
            len: u32::try_from(len).ok()?,
        })
    }

    #[must_use]
    pub fn owned(bytes: Vec<u8>) -> Self {
        let len = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
        Self {
            bytes: Arc::from(bytes),
            offset: 0,
            len,
        }
    }

    #[must_use]
    pub fn as_slice(&self) -> &[u8] {
        let start = usize::try_from(self.offset).unwrap_or(0);
        let len = usize::try_from(self.len).unwrap_or(0);
        let end = start.saturating_add(len);
        self.bytes.get(start..end).unwrap_or(&[])
    }

    #[must_use]
    pub fn len(&self) -> usize {
        usize::try_from(self.len).unwrap_or(usize::MAX)
    }

    #[must_use]
    pub const fn is_empty(&self) -> bool {
        self.len == 0
    }
}

#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct PayloadFragmentBatch {
    fragments: Vec<SharedPayloadFragment>,
    total_len: usize,
    suffix_total_lens: Vec<usize>,
}

impl PayloadFragmentBatch {
    #[must_use]
    pub fn new(fragments: Vec<SharedPayloadFragment>) -> Self {
        let mut suffix_total_lens = Vec::with_capacity(fragments.len());
        let mut total_len = 0_usize;
        for fragment in fragments.iter().rev() {
            total_len = total_len.saturating_add(fragment.len());
            suffix_total_lens.push(total_len);
        }
        suffix_total_lens.reverse();
        Self {
            fragments,
            total_len,
            suffix_total_lens,
        }
    }

    #[must_use]
    pub fn from_owned_fragments(fragments: Vec<Vec<u8>>) -> Self {
        Self::new(
            fragments
                .into_iter()
                .map(SharedPayloadFragment::owned)
                .collect(),
        )
    }

    #[must_use]
    pub const fn len(&self) -> usize {
        self.fragments.len()
    }

    #[must_use]
    pub const fn is_empty(&self) -> bool {
        self.fragments.is_empty()
    }

    #[must_use]
    pub const fn total_len(&self) -> usize {
        self.total_len
    }

    #[must_use]
    pub fn total_len_from(&self, start_index: usize) -> Option<usize> {
        self.suffix_total_lens.get(start_index).copied()
    }

    #[must_use]
    pub fn slice_from(&self, start_index: usize) -> Option<&[SharedPayloadFragment]> {
        self.fragments.get(start_index..)
    }

    #[must_use]
    pub fn fragment_len(&self, index: usize) -> Option<usize> {
        self.fragments.get(index).map(SharedPayloadFragment::len)
    }

    #[must_use]
    pub fn single_fragment_from(&self, start_index: usize) -> Option<&SharedPayloadFragment> {
        let suffix = self.fragments.get(start_index..)?;
        match suffix {
            [fragment] => Some(fragment),
            _ => None,
        }
    }
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CompletedDataSet {
    pub slot: u64,
    pub start_index: u32,
    pub end_index: u32,
    pub payload_fragments: PayloadFragmentBatch,
    pub last_in_slot: bool,
    pub first_shred_observed_at: Instant,
    pub last_shred_observed_at: Instant,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct InlineContiguousDataSet {
    pub slot: u64,
    pub start_index: u32,
    pub end_index: u32,
    pub payload_fragments: PayloadFragmentBatch,
    pub fragment_observed_ats: Vec<Instant>,
}

#[derive(Debug, Default)]
pub struct DataSetReassembler {
    slots: HashMap<u64, SlotStream>,
    max_tracked_slots: usize,
    retained_slot_lag: u64,
    last_pruned_floor: u64,
    tail_min_shreds_without_anchor: usize,
}

impl DataSetReassembler {
    #[must_use]
    pub fn new(max_tracked_slots: usize) -> Self {
        Self {
            slots: HashMap::new(),
            max_tracked_slots,
            retained_slot_lag: u64::try_from(max_tracked_slots / 4)
                .unwrap_or(u64::MAX)
                .clamp(64, 256),
            last_pruned_floor: 0,
            tail_min_shreds_without_anchor: 2,
        }
    }

    #[must_use]
    pub fn with_retained_slot_lag(mut self, retained_slot_lag: u64) -> Self {
        self.retained_slot_lag = retained_slot_lag.max(1);
        self
    }

    #[must_use]
    pub fn with_tail_min_shreds_without_anchor(mut self, min_shreds: usize) -> Self {
        self.tail_min_shreds_without_anchor = min_shreds.max(1);
        self
    }

    pub fn ingest_data_shred_meta(
        &mut self,
        slot: u64,
        index: u32,
        data_complete: bool,
        last_in_slot: bool,
        payload_fragment: SharedPayloadFragment,
    ) -> Vec<CompletedDataSet> {
        self.ingest_data_shred_meta_at(
            slot,
            index,
            data_complete,
            last_in_slot,
            payload_fragment,
            Instant::now(),
        )
    }

    pub fn ingest_data_shred_meta_at(
        &mut self,
        slot: u64,
        index: u32,
        data_complete: bool,
        last_in_slot: bool,
        payload_fragment: SharedPayloadFragment,
        observed_at: Instant,
    ) -> Vec<CompletedDataSet> {
        self.purge_older_than(slot.saturating_sub(self.retained_slot_lag));
        self.evict_if_needed(slot);

        let stream = self
            .slots
            .entry(slot)
            .or_insert_with(|| SlotStream::new(slot, self.tail_min_shreds_without_anchor));
        stream.insert(
            index,
            data_complete,
            last_in_slot,
            payload_fragment,
            observed_at,
        );

        let completed = stream.drain_contiguous_datasets();
        if stream.finished {
            let _ = self.slots.remove(&slot);
        }
        completed
    }

    #[must_use]
    pub fn inline_contiguous_dataset(&self, slot: u64) -> Option<InlineContiguousDataSet> {
        self.slots.get(&slot)?.inline_contiguous_dataset()
    }

    #[must_use]
    pub fn inline_contiguous_datasets(&self, slot: u64) -> Vec<InlineContiguousDataSet> {
        self.slots
            .get(&slot)
            .map_or_else(Vec::new, SlotStream::inline_contiguous_datasets)
    }

    fn evict_if_needed(&mut self, incoming_slot: u64) {
        if self.slots.len() < self.max_tracked_slots || self.slots.contains_key(&incoming_slot) {
            return;
        }
        if let Some(oldest_slot) = self.slots.keys().min().copied() {
            let _ = self.slots.remove(&oldest_slot);
        }
    }

    pub fn purge_older_than(&mut self, slot_floor: u64) -> usize {
        if slot_floor <= self.last_pruned_floor {
            return 0;
        }
        let before = self.slots.len();
        self.slots.retain(|slot, _| *slot >= slot_floor);
        self.last_pruned_floor = slot_floor;
        before.saturating_sub(self.slots.len())
    }

    #[must_use]
    pub fn tracked_slots(&self) -> usize {
        self.slots.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn purge_older_than_removes_old_slot_streams() {
        let mut reassembler = DataSetReassembler::new(16).with_retained_slot_lag(64);
        let _ = reassembler.slots.insert(100, SlotStream::new(100, 2));
        let _ = reassembler.slots.insert(120, SlotStream::new(120, 2));
        let _ = reassembler.slots.insert(140, SlotStream::new(140, 2));

        let purged = reassembler.purge_older_than(121);

        assert_eq!(purged, 2);
        assert_eq!(reassembler.tracked_slots(), 1);
        assert!(reassembler.slots.contains_key(&140));
    }
}