sof 0.17.1

Solana Observer Framework for low-latency shred ingestion and plugin-driven transaction observation
Documentation
use std::{
    collections::{BTreeMap, HashMap},
    time::{Duration, Instant},
};

const DATA_SHREDS_PER_FEC_SET: usize = 32;
const DEFAULT_TICKS_PER_SECOND: u64 = 160;
const MIN_PER_SLOT_REQUEST_CAP: usize = 1;
const MAX_TIP_PROBE_AHEAD_SLOTS: usize = 256;

#[path = "collect.rs"]
mod collect;
#[path = "slot_state.rs"]
mod slot_state;
#[path = "slots.rs"]
mod slots;
use slot_state::duration_to_ms_u64;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct MissingShredRequest {
    pub slot: u64,
    pub index: u32,
    pub kind: MissingShredRequestKind,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub enum MissingShredRequestKind {
    WindowIndex,
    HighestWindowIndex,
}

#[derive(Debug)]
pub struct MissingShredTracker {
    slots: HashMap<u64, SlotMissingState>,
    latest_slot: u64,
    has_latest_slot: bool,
    slot_window: u64,
    min_slot_lag: u64,
    settle_delay: Duration,
    request_cooldown: Duration,
    auto_backfill_sets: usize,
    per_slot_request_cap: usize,
    tip_probe_ahead_slots: usize,
    epoch: Instant,
}

#[derive(Debug, Default)]
struct SlotMissingState {
    sets: HashMap<u32, FecSetMissingState>,
    backfill_seeded: bool,
    seed_highest_probe: bool,
    first_seen_tick_ms: Option<u64>,
    first_data_tick_ms: Option<u64>,
    min_data_index_seen: Option<u32>,
    last_index_seen: Option<u32>,
    max_data_index_seen: Option<u32>,
    received_reference_ticks: BTreeMap<u32, u8>,
    last_highest_request_tick_ms: Option<u64>,
    contiguous_data_prefix: u32,
}

#[derive(Debug)]
struct FecSetMissingState {
    first_seen_tick_ms: u64,
    expected_data_shreds: u16,
    received_mask: u32,
    last_request_tick_ms: [u64; DATA_SHREDS_PER_FEC_SET],
}
impl MissingShredTracker {
    #[must_use]
    pub fn new(
        slot_window: u64,
        min_slot_lag: u64,
        settle_delay: Duration,
        request_cooldown: Duration,
        auto_backfill_sets: usize,
        per_slot_request_cap: usize,
        tip_probe_ahead_slots: usize,
    ) -> Self {
        Self {
            slots: HashMap::new(),
            latest_slot: 0,
            has_latest_slot: false,
            slot_window,
            min_slot_lag,
            settle_delay,
            request_cooldown,
            auto_backfill_sets,
            per_slot_request_cap: per_slot_request_cap.max(MIN_PER_SLOT_REQUEST_CAP),
            tip_probe_ahead_slots: tip_probe_ahead_slots.min(MAX_TIP_PROBE_AHEAD_SLOTS),
            epoch: Instant::now(),
        }
    }

    pub fn on_data_shred(
        &mut self,
        slot: u64,
        index: u32,
        fec_set_index: u32,
        last_in_slot: bool,
        reference_tick: u8,
        now: Instant,
    ) {
        let tick_ms = self.tick_ms(now);
        self.touch_slot(slot, tick_ms);
        let Some(position) = index.checked_sub(fec_set_index) else {
            return;
        };
        let Ok(position_usize) = usize::try_from(position) else {
            return;
        };
        if position_usize >= DATA_SHREDS_PER_FEC_SET {
            return;
        }
        let mut seed_next_slot_probe = false;
        {
            let slot_state = self.slots.entry(slot).or_default();
            slot_state.observe_data_index(index, reference_tick, tick_ms);
            if last_in_slot {
                slot_state.observe_last_in_slot(index);
                seed_next_slot_probe = true;
            }
            let set = slot_state
                .sets
                .entry(fec_set_index)
                .or_insert_with(|| FecSetMissingState::new(tick_ms));
            set.mark_received(position);
            slot_state.advance_contiguous_prefix();
        }
        self.seed_backfill_sets(slot, fec_set_index, tick_ms);
        if seed_next_slot_probe {
            self.seed_next_slot_probe(slot, tick_ms);
        }
    }

    pub fn on_recovered_data_shred(
        &mut self,
        slot: u64,
        index: u32,
        fec_set_index: u32,
        last_in_slot: bool,
        reference_tick: u8,
        now: Instant,
    ) {
        self.on_data_shred(
            slot,
            index,
            fec_set_index,
            last_in_slot,
            reference_tick,
            now,
        );
    }

    pub fn on_code_shred(
        &mut self,
        slot: u64,
        fec_set_index: u32,
        num_data_shreds: u16,
        now: Instant,
    ) {
        let tick_ms = self.tick_ms(now);
        self.touch_slot(slot, tick_ms);
        let slot_state = self.slots.entry(slot).or_default();
        let set = slot_state
            .sets
            .entry(fec_set_index)
            .or_insert_with(|| FecSetMissingState::new(tick_ms));
        set.set_expected_data_shreds(num_data_shreds);
        self.seed_backfill_sets(slot, fec_set_index, tick_ms);
    }

    pub fn seed_highest_probe_slot(&mut self, slot: u64, now: Instant) {
        let tick_ms = self.tick_ms(now);
        self.touch_slot(slot, tick_ms);
        if let Some(state) = self.slots.get_mut(&slot)
            && state.received_upper_bound().is_none()
        {
            state.seed_highest_probe = true;
        }
    }

    pub const fn set_min_slot_lag(&mut self, min_slot_lag: u64) {
        self.min_slot_lag = min_slot_lag;
    }

    pub fn set_per_slot_request_cap(&mut self, per_slot_request_cap: usize) {
        self.per_slot_request_cap = per_slot_request_cap.max(MIN_PER_SLOT_REQUEST_CAP);
    }
}