apfsds_storage/
segment.rs1use apfsds_protocol::ConnRecord;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6static SEGMENT_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
8
9pub struct Segment {
11 pub id: u64,
13
14 data: Vec<u8>,
16
17 offsets: Vec<usize>,
19
20 pub is_sealed: bool,
22
23 size_limit: usize,
25}
26
27impl Segment {
28 pub fn new() -> Self {
30 Self::with_size_limit(10 * 1024 * 1024) }
32
33 pub fn with_size_limit(size_limit: usize) -> Self {
35 Self {
36 id: SEGMENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
37 data: Vec::with_capacity(size_limit / 10),
38 offsets: Vec::new(),
39 is_sealed: false,
40 size_limit,
41 }
42 }
43
44 pub fn append(&mut self, record: &ConnRecord) -> Option<usize> {
46 if self.is_sealed {
47 return None;
48 }
49
50 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(record).ok()?;
51
52 if self.data.len() + bytes.len() > self.size_limit {
53 return None;
54 }
55
56 let offset = self.data.len();
57 self.data.extend_from_slice(&bytes);
58 self.offsets.push(offset);
59
60 Some(offset)
61 }
62
63 pub fn read_at(&self, offset: usize) -> Option<ConnRecord> {
65 if offset >= self.data.len() {
66 return None;
67 }
68
69 let end = self
71 .offsets
72 .iter()
73 .find(|&&o| o > offset)
74 .copied()
75 .unwrap_or(self.data.len());
76
77 let bytes = &self.data[offset..end];
78
79 let archived =
80 rkyv::access::<apfsds_protocol::ArchivedConnRecord, rkyv::rancor::Error>(bytes).ok()?;
81 rkyv::deserialize::<ConnRecord, rkyv::rancor::Error>(archived).ok()
82 }
83
84 pub fn size(&self) -> usize {
86 self.data.len()
87 }
88
89 pub fn record_count(&self) -> usize {
91 self.offsets.len()
92 }
93
94 pub fn seal(&mut self) {
96 self.is_sealed = true;
97 }
98}
99
100impl Default for Segment {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106#[derive(Debug, Clone, Copy)]
108pub struct SegmentPtr {
109 pub segment_id: u64,
110 pub offset: usize,
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use apfsds_protocol::ConnMeta;
117
118 fn make_record(conn_id: u64) -> ConnRecord {
119 ConnRecord {
120 conn_id,
121 metadata: ConnMeta {
122 client_addr: [0; 16],
123 nat_entry: (1234, 5678),
124 assigned_pod: 1,
125 stream_states: vec![],
126 },
127 created_at: 0,
128 last_active: 0,
129 access_count: 0,
130 txid: 0,
131 }
132 }
133
134 #[test]
135 fn test_append_and_read() {
136 let mut segment = Segment::new();
137 let record = make_record(42);
138
139 let offset = segment.append(&record).unwrap();
140 let read_back = segment.read_at(offset).unwrap();
141
142 assert_eq!(read_back.conn_id, 42);
143 }
144
145 #[test]
146 fn test_multiple_records() {
147 let mut segment = Segment::new();
148
149 for i in 0..10 {
150 let record = make_record(i);
151 segment.append(&record).unwrap();
152 }
153
154 assert_eq!(segment.record_count(), 10);
155 }
156
157 #[test]
158 fn test_sealed_segment() {
159 let mut segment = Segment::new();
160 segment.seal();
161
162 let record = make_record(1);
163 assert!(segment.append(&record).is_none());
164 }
165}