Skip to main content

apfsds_storage/
segment.rs

1//! Storage segment for MVCC
2
3use apfsds_protocol::ConnRecord;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6/// Segment ID counter
7static SEGMENT_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
8
9/// A storage segment containing connection records
10pub struct Segment {
11    /// Segment ID
12    pub id: u64,
13
14    /// Data buffer
15    data: Vec<u8>,
16
17    /// Record offsets
18    offsets: Vec<usize>,
19
20    /// Is this segment sealed (immutable)?
21    pub is_sealed: bool,
22
23    /// Size limit
24    size_limit: usize,
25}
26
27impl Segment {
28    /// Create a new segment
29    pub fn new() -> Self {
30        Self::with_size_limit(10 * 1024 * 1024) // 10MB default
31    }
32
33    /// Create with custom size limit
34    pub fn with_size_limit(size_limit: usize) -> Self {
35        Self {
36            id: SEGMENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
37            data: Vec::with_capacity(size_limit / 10),
38            offsets: Vec::new(),
39            is_sealed: false,
40            size_limit,
41        }
42    }
43
44    /// Append a record to the segment
45    pub fn append(&mut self, record: &ConnRecord) -> Option<usize> {
46        if self.is_sealed {
47            return None;
48        }
49
50        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(record).ok()?;
51
52        if self.data.len() + bytes.len() > self.size_limit {
53            return None;
54        }
55
56        let offset = self.data.len();
57        self.data.extend_from_slice(&bytes);
58        self.offsets.push(offset);
59
60        Some(offset)
61    }
62
63    /// Read a record at offset
64    pub fn read_at(&self, offset: usize) -> Option<ConnRecord> {
65        if offset >= self.data.len() {
66            return None;
67        }
68
69        // Find end of record (next offset or end of data)
70        let end = self
71            .offsets
72            .iter()
73            .find(|&&o| o > offset)
74            .copied()
75            .unwrap_or(self.data.len());
76
77        let bytes = &self.data[offset..end];
78
79        let archived =
80            rkyv::access::<apfsds_protocol::ArchivedConnRecord, rkyv::rancor::Error>(bytes).ok()?;
81        rkyv::deserialize::<ConnRecord, rkyv::rancor::Error>(archived).ok()
82    }
83
84    /// Get the current size
85    pub fn size(&self) -> usize {
86        self.data.len()
87    }
88
89    /// Get the number of records
90    pub fn record_count(&self) -> usize {
91        self.offsets.len()
92    }
93
94    /// Seal the segment (make immutable)
95    pub fn seal(&mut self) {
96        self.is_sealed = true;
97    }
98}
99
100impl Default for Segment {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106/// Pointer to a record in a segment
107#[derive(Debug, Clone, Copy)]
108pub struct SegmentPtr {
109    pub segment_id: u64,
110    pub offset: usize,
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use apfsds_protocol::ConnMeta;
117
118    fn make_record(conn_id: u64) -> ConnRecord {
119        ConnRecord {
120            conn_id,
121            metadata: ConnMeta {
122                client_addr: [0; 16],
123                nat_entry: (1234, 5678),
124                assigned_pod: 1,
125                stream_states: vec![],
126            },
127            created_at: 0,
128            last_active: 0,
129            access_count: 0,
130            txid: 0,
131        }
132    }
133
134    #[test]
135    fn test_append_and_read() {
136        let mut segment = Segment::new();
137        let record = make_record(42);
138
139        let offset = segment.append(&record).unwrap();
140        let read_back = segment.read_at(offset).unwrap();
141
142        assert_eq!(read_back.conn_id, 42);
143    }
144
145    #[test]
146    fn test_multiple_records() {
147        let mut segment = Segment::new();
148
149        for i in 0..10 {
150            let record = make_record(i);
151            segment.append(&record).unwrap();
152        }
153
154        assert_eq!(segment.record_count(), 10);
155    }
156
157    #[test]
158    fn test_sealed_segment() {
159        let mut segment = Segment::new();
160        segment.seal();
161
162        let record = make_record(1);
163        assert!(segment.append(&record).is_none());
164    }
165}