sof 0.17.1

Solana Observer Framework for low-latency shred ingestion and plugin-driven transaction observation
Documentation
use std::collections::{HashMap, hash_map::Entry};

use reed_solomon_erasure::galois_8::ReedSolomon;

use crate::shred::wire::{ParsedShred, ShredVariant, parse_shred};

#[path = "recover.rs"]
mod recover;

use recover::{parse_packet_signature, recover_missing_data};

const SIZE_OF_SIGNATURE: usize = 64;
const SIZE_OF_MERKLE_ROOT: usize = 32;
const SIZE_OF_MERKLE_PROOF_ENTRY: usize = 20;

pub struct FecRecoverer {
    sets: HashMap<(u64, u32), ErasureSet>,
    reed_solomon_cache: HashMap<(usize, usize), ReedSolomon>,
    max_tracked_sets: usize,
    retained_slot_lag: u64,
    last_pruned_floor: u64,
}

struct ErasureSet {
    variant: Option<SetVariant>,
    config: Option<ErasureConfig>,
    config_fec_set_index: Option<u32>,
    leader_signature: [u8; SIZE_OF_SIGNATURE],
    data_shards: HashMap<u32, Vec<u8>>,
    coding_shards: HashMap<u16, Vec<u8>>,
    present_data_shreds_in_config: usize,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct SetVariant {
    proof_size: u8,
    resigned: bool,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct ErasureConfig {
    num_data: usize,
    num_coding: usize,
}

impl ErasureSet {
    fn new(leader_signature: [u8; SIZE_OF_SIGNATURE]) -> Self {
        Self {
            variant: None,
            config: None,
            config_fec_set_index: None,
            leader_signature,
            data_shards: HashMap::new(),
            coding_shards: HashMap::new(),
            present_data_shreds_in_config: 0,
        }
    }
}

impl FecRecoverer {
    #[must_use]
    pub fn new(max_tracked_sets: usize, retained_slot_lag: u64) -> Self {
        Self {
            sets: HashMap::new(),
            reed_solomon_cache: HashMap::new(),
            max_tracked_sets,
            retained_slot_lag: retained_slot_lag.max(1),
            last_pruned_floor: 0,
        }
    }

    pub fn ingest_packet(&mut self, packet: &[u8]) -> Vec<Vec<u8>> {
        let parsed = match parse_shred(packet) {
            Ok(parsed) => parsed,
            Err(_) => return Vec::new(),
        };
        let signature = match parse_packet_signature(packet) {
            Some(signature) => signature,
            None => return Vec::new(),
        };

        let (slot, fec_set_index, variant) = match &parsed {
            ParsedShred::Data(data) => (
                data.common.slot,
                data.common.fec_set_index,
                SetVariant::from(data.common.shred_variant),
            ),
            ParsedShred::Code(code) => (
                code.common.slot,
                code.common.fec_set_index,
                SetVariant::from(code.common.shred_variant),
            ),
        };

        let set_id = (slot, fec_set_index);
        self.purge_older_than(slot.saturating_sub(self.retained_slot_lag));
        self.evict_if_needed(set_id);

        let mut recovered = Vec::new();
        let mut should_remove = false;
        if let Some(set) = self.sets.get_mut(&set_id) {
            if !set.accepts_variant(variant) {
                return Vec::new();
            }
            set.ingest_packet(&parsed, packet);
            recovered = recover_missing_data(set, fec_set_index, &mut self.reed_solomon_cache)
                .unwrap_or_default();
            should_remove = set.is_data_complete_for_config(fec_set_index);
        } else {
            let mut new_set = ErasureSet::new(signature);
            new_set.ingest_packet(&parsed, packet);
            let _ = self.sets.insert(set_id, new_set);
        }

        if should_remove {
            let _ = self.sets.remove(&set_id);
        }

        recovered
    }

    fn evict_if_needed(&mut self, incoming_set: (u64, u32)) {
        if self.sets.len() < self.max_tracked_sets || self.sets.contains_key(&incoming_set) {
            return;
        }
        if let Some(oldest_key) = self
            .sets
            .keys()
            .min_by_key(|(slot, fec_set_index)| (*slot, *fec_set_index))
            .copied()
        {
            let _ = self.sets.remove(&oldest_key);
        }
    }

    pub fn purge_older_than(&mut self, slot_floor: u64) -> usize {
        if slot_floor <= self.last_pruned_floor {
            return 0;
        }
        let before = self.sets.len();
        self.sets.retain(|(slot, _), _| *slot >= slot_floor);
        self.last_pruned_floor = slot_floor;
        before.saturating_sub(self.sets.len())
    }

    #[must_use]
    pub fn tracked_sets(&self) -> usize {
        self.sets.len()
    }
}

impl ErasureSet {
    fn accepts_variant(&self, incoming: SetVariant) -> bool {
        self.variant.is_none_or(|existing| existing == incoming)
    }

    fn ingest_packet(&mut self, parsed: &ParsedShred, packet: &[u8]) {
        let common_variant = match parsed {
            ParsedShred::Data(data) => data.common.shred_variant,
            ParsedShred::Code(code) => code.common.shred_variant,
        };
        if self.variant.is_none() {
            self.variant = Some(SetVariant::from(common_variant));
        }
        let Some(shard_len) = coding_erasure_shard_len(SetVariant::from(common_variant)) else {
            return;
        };

        match parsed {
            ParsedShred::Data(data) => {
                let Some(shard) = extract_data_erasure_shard(packet, shard_len) else {
                    return;
                };
                if let Entry::Vacant(vacant) = self.data_shards.entry(data.common.index) {
                    let _ = vacant.insert(shard);
                    if self.index_within_config(data.common.index, self.config_fec_set_index) {
                        self.present_data_shreds_in_config =
                            self.present_data_shreds_in_config.saturating_add(1);
                    }
                }
            }
            ParsedShred::Code(code) => {
                let Some(shard) = extract_coding_erasure_shard(packet, shard_len) else {
                    return;
                };
                let incoming_config = ErasureConfig {
                    num_data: usize::from(code.coding_header.num_data_shreds),
                    num_coding: usize::from(code.coding_header.num_coding_shreds),
                };
                if let Some(config) = self.config
                    && config != incoming_config
                {
                    return;
                }
                if let Entry::Vacant(vacant) = self.coding_shards.entry(code.coding_header.position)
                {
                    let _ = vacant.insert(shard);
                }
                if self.config != Some(incoming_config)
                    || self.config_fec_set_index != Some(code.common.fec_set_index)
                {
                    self.config = Some(incoming_config);
                    self.config_fec_set_index = Some(code.common.fec_set_index);
                    self.present_data_shreds_in_config =
                        self.count_present_data_shreds_in_config(code.common.fec_set_index);
                }
            }
        }
    }

    fn is_data_complete_for_config(&self, fec_set_index: u32) -> bool {
        let Some(config) = self.config else {
            return false;
        };
        self.present_data_shreds_in_config >= config.num_data
            && self.config_fec_set_index == Some(fec_set_index)
    }

    fn index_within_config(&self, index: u32, fec_set_index: Option<u32>) -> bool {
        let (Some(config), Some(fec_set_index)) = (self.config, fec_set_index) else {
            return false;
        };
        let Some(delta) = index.checked_sub(fec_set_index) else {
            return false;
        };
        let Ok(offset) = usize::try_from(delta) else {
            return false;
        };
        offset < config.num_data
    }

    fn count_present_data_shreds_in_config(&self, fec_set_index: u32) -> usize {
        let Some(config) = self.config else {
            return 0;
        };
        self.data_shards
            .keys()
            .filter(|&&index| {
                usize::try_from(index.saturating_sub(fec_set_index))
                    .ok()
                    .is_some_and(|offset| offset < config.num_data)
            })
            .count()
    }

    fn insert_recovered_data_shred(
        &mut self,
        fec_set_index: u32,
        index: u32,
        recovered_shard: Vec<u8>,
    ) -> bool {
        if let Entry::Vacant(vacant) = self.data_shards.entry(index) {
            let _ = vacant.insert(recovered_shard);
            if self.index_within_config(index, Some(fec_set_index)) {
                self.present_data_shreds_in_config =
                    self.present_data_shreds_in_config.saturating_add(1);
            }
            return true;
        }
        false
    }
}

impl From<ShredVariant> for SetVariant {
    fn from(value: ShredVariant) -> Self {
        Self {
            proof_size: value.proof_size,
            resigned: value.resigned,
        }
    }
}

fn extract_data_erasure_shard(packet: &[u8], shard_len: usize) -> Option<Vec<u8>> {
    let start = SIZE_OF_SIGNATURE;
    let end = start.checked_add(shard_len)?;
    packet.get(start..end).map(ToOwned::to_owned)
}

fn extract_coding_erasure_shard(packet: &[u8], shard_len: usize) -> Option<Vec<u8>> {
    let start = crate::shred::wire::SIZE_OF_CODING_SHRED_HEADERS;
    let end = start.checked_add(shard_len)?;
    packet.get(start..end).map(ToOwned::to_owned)
}

fn coding_erasure_shard_len(variant: SetVariant) -> Option<usize> {
    let proof = usize::from(variant.proof_size);
    let proof_bytes = proof.checked_mul(SIZE_OF_MERKLE_PROOF_ENTRY)?;
    let trailer =
        SIZE_OF_MERKLE_ROOT
            .checked_add(proof_bytes)?
            .checked_add(if variant.resigned {
                SIZE_OF_SIGNATURE
            } else {
                0
            })?;
    crate::shred::wire::SIZE_OF_CODING_SHRED_PAYLOAD
        .checked_sub(crate::shred::wire::SIZE_OF_CODING_SHRED_HEADERS.checked_add(trailer)?)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn purge_older_than_removes_old_sets() {
        let mut recoverer = FecRecoverer::new(16, 64);
        let _ = recoverer
            .sets
            .insert((100, 0), ErasureSet::new([0; SIZE_OF_SIGNATURE]));
        let _ = recoverer
            .sets
            .insert((120, 0), ErasureSet::new([0; SIZE_OF_SIGNATURE]));
        let _ = recoverer
            .sets
            .insert((140, 0), ErasureSet::new([0; SIZE_OF_SIGNATURE]));

        let purged = recoverer.purge_older_than(121);

        assert_eq!(purged, 2);
        assert_eq!(recoverer.tracked_sets(), 1);
        assert!(recoverer.sets.contains_key(&(140, 0)));
    }

    #[test]
    fn data_completeness_tracks_in_range_count_once_config_is_known() {
        let mut set = ErasureSet::new([0; SIZE_OF_SIGNATURE]);
        let _ = set.data_shards.insert(10, vec![1]);
        let _ = set.data_shards.insert(11, vec![2]);

        set.config = Some(ErasureConfig {
            num_data: 2,
            num_coding: 1,
        });
        set.config_fec_set_index = Some(10);
        set.present_data_shreds_in_config = set.count_present_data_shreds_in_config(10);

        assert!(set.is_data_complete_for_config(10));

        let mut incomplete_set = ErasureSet::new([0; SIZE_OF_SIGNATURE]);
        let _ = incomplete_set.data_shards.insert(10, vec![1]);
        let _ = incomplete_set.data_shards.insert(12, vec![2]);
        incomplete_set.config = Some(ErasureConfig {
            num_data: 2,
            num_coding: 1,
        });
        incomplete_set.config_fec_set_index = Some(10);
        incomplete_set.present_data_shreds_in_config =
            incomplete_set.count_present_data_shreds_in_config(10);

        assert!(!incomplete_set.is_data_complete_for_config(10));
    }
}