sof 0.17.1

Solana Observer Framework for low-latency shred ingestion and plugin-driven transaction observation
Documentation
use super::DataSetReassembler;
use super::PayloadFragmentBatch;
use std::time::{Duration, Instant};

#[test]
fn completes_dataset_on_data_complete() {
    let mut reassembler = DataSetReassembler::new(16);
    let out0 = ingest(&mut reassembler, 7, 0, false, false, vec![0]);
    assert!(out0.is_empty());
    let out1 = ingest(&mut reassembler, 7, 1, true, false, vec![1]);
    assert_eq!(out1.len(), 1);
    assert_eq!(
        out1[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![0], vec![1]])
    );
    assert_eq!(out1[0].start_index, 0);
    assert_eq!(out1[0].end_index, 1);
}

#[test]
fn handles_out_of_order_fragments() {
    let mut reassembler = DataSetReassembler::new(16);
    let out0 = ingest(&mut reassembler, 9, 1, true, false, vec![1]);
    assert!(out0.is_empty());
    let out1 = ingest(&mut reassembler, 9, 0, false, false, vec![0]);
    assert_eq!(out1.len(), 1);
    assert_eq!(out1[0].start_index, 0);
    assert_eq!(out1[0].end_index, 1);
    assert_eq!(
        out1[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![0], vec![1]])
    );
}

#[test]
fn suppresses_single_shred_tail_without_anchor() {
    let mut reassembler = DataSetReassembler::new(16);
    let out = ingest(&mut reassembler, 10, 1, true, false, vec![1]);
    assert!(out.is_empty());
}

#[test]
fn reanchors_after_late_prefix_arrival() {
    let mut reassembler = DataSetReassembler::new(16);
    let _ = ingest(&mut reassembler, 11, 1, true, false, vec![1]);
    let out = ingest(&mut reassembler, 11, 0, false, false, vec![0]);
    assert_eq!(out.len(), 1);
    assert_eq!(out[0].start_index, 0);
    assert_eq!(out[0].end_index, 1);
    assert_eq!(
        out[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![0], vec![1]])
    );
}

#[test]
fn reanchors_on_consecutive_completed_boundaries() {
    let mut reassembler = DataSetReassembler::new(16);
    let out0 = ingest(&mut reassembler, 12, 2, false, false, vec![2]);
    assert!(out0.is_empty());
    let out1 = ingest(&mut reassembler, 12, 3, true, false, vec![3]);
    assert_eq!(out1.len(), 1);
    assert_eq!(out1[0].start_index, 2);
    assert_eq!(out1[0].end_index, 3);
    assert_eq!(
        out1[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![2], vec![3]])
    );

    let out2 = ingest(&mut reassembler, 12, 0, false, false, vec![0]);
    assert!(out2.is_empty());
    let out3 = ingest(&mut reassembler, 12, 1, true, false, vec![1]);
    assert_eq!(out3.len(), 1);
    assert_eq!(out3[0].start_index, 0);
    assert_eq!(out3[0].end_index, 1);
    assert_eq!(
        out3[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![0], vec![1]])
    );
}

#[test]
fn emits_tail_without_known_boundary_anchor() {
    let mut reassembler = DataSetReassembler::new(16);
    let out0 = ingest(&mut reassembler, 15, 5, false, false, vec![5]);
    assert!(out0.is_empty());
    let out1 = ingest(&mut reassembler, 15, 6, true, false, vec![6]);
    assert_eq!(out1.len(), 1);
    assert_eq!(out1[0].start_index, 5);
    assert_eq!(out1[0].end_index, 6);
    assert_eq!(
        out1[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![5], vec![6]])
    );
}

#[test]
fn emits_following_dataset_with_missing_prefix_dataset() {
    let mut reassembler = DataSetReassembler::new(16);
    let out0 = ingest(&mut reassembler, 30, 1, true, false, vec![1]);
    assert!(out0.is_empty());
    let out1 = ingest(&mut reassembler, 30, 2, false, false, vec![2]);
    assert!(out1.is_empty());
    let out2 = ingest(&mut reassembler, 30, 3, true, false, vec![3]);
    assert_eq!(out2.len(), 1);
    assert_eq!(out2[0].start_index, 2);
    assert_eq!(out2[0].end_index, 3);
    assert_eq!(
        out2[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![2], vec![3]])
    );
}

#[test]
fn emits_missing_prefix_dataset_if_gap_arrives_later() {
    let mut reassembler = DataSetReassembler::new(16);
    let _ = ingest(&mut reassembler, 31, 1, true, false, vec![1]);
    let _ = ingest(&mut reassembler, 31, 2, false, false, vec![2]);
    let _ = ingest(&mut reassembler, 31, 3, true, false, vec![3]);

    let out = ingest(&mut reassembler, 31, 0, false, false, vec![0]);
    assert_eq!(out.len(), 1);
    assert_eq!(out[0].start_index, 0);
    assert_eq!(out[0].end_index, 1);
    assert_eq!(
        out[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![0], vec![1]])
    );
}

#[test]
fn late_prefix_after_last_boundary_still_finishes_slot() {
    let mut reassembler = DataSetReassembler::new(16);
    let _ = ingest(&mut reassembler, 40, 1, true, false, vec![1]);
    let tail_out = ingest(&mut reassembler, 40, 2, false, true, vec![2]);
    assert_eq!(tail_out.len(), 1);
    assert_eq!(tail_out[0].start_index, 2);
    assert_eq!(tail_out[0].end_index, 2);
    assert_eq!(reassembler.tracked_slots(), 1);

    let prefix_out = ingest(&mut reassembler, 40, 0, false, false, vec![0]);
    assert_eq!(prefix_out.len(), 1);
    assert_eq!(prefix_out[0].start_index, 0);
    assert_eq!(prefix_out[0].end_index, 1);
    assert_eq!(reassembler.tracked_slots(), 0);
}

#[test]
fn completed_dataset_tracks_first_and_last_shred_observed_times() {
    let mut reassembler = DataSetReassembler::new(16);
    let first = Instant::now();
    let Some(second) = first.checked_add(Duration::from_millis(2)) else {
        panic!("expected later instant");
    };
    let out0 = reassembler.ingest_data_shred_meta_at(
        50,
        0,
        false,
        false,
        super::SharedPayloadFragment::owned(vec![0]),
        first,
    );
    assert!(out0.is_empty());
    let out1 = reassembler.ingest_data_shred_meta_at(
        50,
        1,
        true,
        false,
        super::SharedPayloadFragment::owned(vec![1]),
        second,
    );
    assert_eq!(out1.len(), 1);
    assert_eq!(out1[0].first_shred_observed_at, first);
    assert_eq!(out1[0].last_shred_observed_at, second);
}

#[test]
fn inline_contiguous_dataset_tracks_open_prefix_after_completed_boundary() {
    let mut reassembler = DataSetReassembler::new(16);
    let first = Instant::now();
    let Some(second) = first.checked_add(Duration::from_millis(1)) else {
        panic!("expected later instant");
    };
    let Some(third) = second.checked_add(Duration::from_millis(1)) else {
        panic!("expected later instant");
    };
    let Some(fourth) = third.checked_add(Duration::from_millis(1)) else {
        panic!("expected later instant");
    };

    let out0 = reassembler.ingest_data_shred_meta_at(
        60,
        0,
        false,
        false,
        super::SharedPayloadFragment::owned(vec![0]),
        first,
    );
    assert!(out0.is_empty());
    let out1 = reassembler.ingest_data_shred_meta_at(
        60,
        1,
        true,
        false,
        super::SharedPayloadFragment::owned(vec![1]),
        second,
    );
    assert_eq!(out1.len(), 1);

    let out2 = reassembler.ingest_data_shred_meta_at(
        60,
        2,
        false,
        false,
        super::SharedPayloadFragment::owned(vec![2]),
        third,
    );
    assert!(out2.is_empty());
    let out3 = reassembler.ingest_data_shred_meta_at(
        60,
        3,
        false,
        false,
        super::SharedPayloadFragment::owned(vec![3]),
        fourth,
    );
    assert!(out3.is_empty());

    let inline = reassembler
        .inline_contiguous_dataset(60)
        .expect("inline contiguous dataset");
    assert_eq!(inline.start_index, 2);
    assert_eq!(inline.end_index, 3);
    assert_eq!(
        inline.payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![2], vec![3]])
    );
    assert_eq!(inline.fragment_observed_ats, vec![third, fourth]);
}

#[test]
fn inline_contiguous_dataset_requires_anchor() {
    let mut reassembler = DataSetReassembler::new(16);
    let _ = ingest(&mut reassembler, 61, 2, false, false, vec![2]);
    let _ = ingest(&mut reassembler, 61, 3, false, false, vec![3]);
    assert!(reassembler.inline_contiguous_datasets(61).is_empty());
}

#[test]
fn inline_contiguous_datasets_include_following_chain_after_known_boundary() {
    let mut reassembler = DataSetReassembler::new(16);
    let _ = ingest(&mut reassembler, 62, 1, true, false, vec![1]);
    let _ = ingest(&mut reassembler, 62, 2, false, false, vec![2]);
    let _ = ingest(&mut reassembler, 62, 3, false, false, vec![3]);

    let inline = reassembler.inline_contiguous_datasets(62);
    assert_eq!(inline.len(), 1);
    assert_eq!(inline[0].start_index, 2);
    assert_eq!(inline[0].end_index, 3);
    assert_eq!(
        inline[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![2], vec![3]])
    );
}

#[test]
fn inline_contiguous_datasets_track_late_prefix_and_current_chain() {
    let mut reassembler = DataSetReassembler::new(16);
    let _ = ingest(&mut reassembler, 63, 2, false, false, vec![2]);
    let _ = ingest(&mut reassembler, 63, 3, true, false, vec![3]);
    let _ = ingest(&mut reassembler, 63, 4, false, false, vec![4]);
    let _ = ingest(&mut reassembler, 63, 5, false, false, vec![5]);
    let _ = ingest(&mut reassembler, 63, 0, false, false, vec![0]);

    let inline = reassembler.inline_contiguous_datasets(63);
    assert_eq!(inline.len(), 2);
    assert_eq!(inline[0].start_index, 0);
    assert_eq!(inline[0].end_index, 0);
    assert_eq!(
        inline[0].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![0]])
    );
    assert_eq!(inline[1].start_index, 4);
    assert_eq!(inline[1].end_index, 5);
    assert_eq!(
        inline[1].payload_fragments,
        PayloadFragmentBatch::from_owned_fragments(vec![vec![4], vec![5]])
    );
}

fn ingest(
    reassembler: &mut DataSetReassembler,
    slot: u64,
    index: u32,
    data_complete: bool,
    last_in_slot: bool,
    serialized_shred: Vec<u8>,
) -> Vec<super::CompletedDataSet> {
    reassembler.ingest_data_shred_meta(
        slot,
        index,
        data_complete,
        last_in_slot,
        super::SharedPayloadFragment::owned(serialized_shred),
    )
}