1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct ServerlessSecondaryIndexEntry {
5 pub collection: String,
6 pub range_start: Vec<u8>,
7 pub range_end: Vec<u8>,
8 pub relative_path: PathBuf,
9 pub offset: u64,
10 pub bytes: u64,
11 pub checksum: u32,
12 pub content_hash: ServerlessContentHash,
13 pub hot: bool,
14}
15
16impl From<&ServerlessExtentRef> for ServerlessSecondaryIndexEntry {
17 fn from(extent: &ServerlessExtentRef) -> Self {
18 Self {
19 collection: extent.collection.clone(),
20 range_start: extent.range_start.clone(),
21 range_end: extent.range_end.clone(),
22 relative_path: extent.relative_path.clone(),
23 offset: extent.offset,
24 bytes: extent.bytes,
25 checksum: extent.checksum,
26 content_hash: extent.content_hash,
27 hot: extent.hot,
28 }
29 }
30}
31
32impl ServerlessSecondaryIndexEntry {
33 pub fn hydration_request(&self) -> ServerlessHydrationRequest {
34 ServerlessHydrationRequest {
35 relative_path: self.relative_path.clone(),
36 offset: self.offset,
37 bytes: self.bytes,
38 checksum: self.checksum,
39 content_hash: self.content_hash,
40 }
41 }
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct ServerlessSecondaryIndex {
46 pub generation: u64,
47 pub entries: Vec<ServerlessSecondaryIndexEntry>,
48}
49
50impl ServerlessSecondaryIndex {
51 pub fn new(generation: u64) -> Self {
52 Self {
53 generation,
54 entries: Vec::new(),
55 }
56 }
57
58 pub fn from_extent_index(index: &ServerlessExtentIndex) -> Self {
59 let mut secondary = Self::new(index.generation);
60 for extent in &index.extents {
61 secondary.push(ServerlessSecondaryIndexEntry::from(extent));
62 }
63 secondary
64 }
65
66 pub fn push(&mut self, entry: ServerlessSecondaryIndexEntry) {
67 self.entries.push(entry);
68 self.entries.sort_by(|left, right| {
69 (
70 left.collection.as_str(),
71 left.range_start.as_slice(),
72 left.range_end.as_slice(),
73 left.relative_path.to_string_lossy(),
74 left.offset,
75 )
76 .cmp(&(
77 right.collection.as_str(),
78 right.range_start.as_slice(),
79 right.range_end.as_slice(),
80 right.relative_path.to_string_lossy(),
81 right.offset,
82 ))
83 });
84 }
85
86 pub fn entries_for_collection(&self, collection: &str) -> Vec<&ServerlessSecondaryIndexEntry> {
87 self.entries
88 .iter()
89 .filter(|entry| entry.collection == collection)
90 .collect()
91 }
92
93 pub fn hydration_plan_for_collection(&self, collection: &str) -> ServerlessHydrationPlan {
94 ServerlessHydrationPlan {
95 generation: self.generation,
96 requests: self
97 .entries_for_collection(collection)
98 .into_iter()
99 .map(ServerlessSecondaryIndexEntry::hydration_request)
100 .collect(),
101 }
102 }
103
104 pub fn write_to_path(&self, path: impl AsRef<Path>) -> RdbFileResult<()> {
105 write_bytes(path, &self.encode())
106 }
107
108 pub fn read_from_path(path: impl AsRef<Path>) -> RdbFileResult<Self> {
109 Self::decode(&fs::read(path)?)
110 }
111
112 pub fn encode(&self) -> Vec<u8> {
113 let mut out = Vec::new();
114 out.extend_from_slice(SERVERLESS_SECONDARY_INDEX_MAGIC);
115 put_u16(&mut out, SERVERLESS_ARTIFACT_VERSION);
116 put_u64(&mut out, self.generation);
117 put_u32(&mut out, self.entries.len() as u32);
118 for entry in &self.entries {
119 put_string(&mut out, &entry.collection);
120 put_bytes(&mut out, &entry.range_start);
121 put_bytes(&mut out, &entry.range_end);
122 put_string(&mut out, &entry.relative_path.to_string_lossy());
123 put_u64(&mut out, entry.offset);
124 put_u64(&mut out, entry.bytes);
125 put_u32(&mut out, entry.checksum);
126 put_content_hash(&mut out, entry.content_hash);
127 out.push(u8::from(entry.hot));
128 }
129 let checksum = crc32(&out);
130 put_u32(&mut out, checksum);
131 out
132 }
133
134 pub fn decode(bytes: &[u8]) -> RdbFileResult<Self> {
135 verify_checksum(bytes)?;
136 let mut cursor = 0usize;
137 expect_magic(bytes, &mut cursor, SERVERLESS_SECONDARY_INDEX_MAGIC)?;
138 let version = take_u16(bytes, &mut cursor)?;
139 if version != SERVERLESS_ARTIFACT_VERSION {
140 return Err(RdbFileError::InvalidOperation(format!(
141 "unsupported serverless secondary index version {version}"
142 )));
143 }
144 let generation = take_u64(bytes, &mut cursor)?;
145 let count = take_u32(bytes, &mut cursor)? as usize;
146 let mut index = Self::new(generation);
147 for _ in 0..count {
148 let collection = take_string(bytes, &mut cursor)?;
149 let range_start = take_vec_bytes(bytes, &mut cursor)?;
150 let range_end = take_vec_bytes(bytes, &mut cursor)?;
151 if !range_end.is_empty() && range_start >= range_end {
152 return Err(RdbFileError::InvalidOperation(
153 "serverless secondary index range_start must be before range_end".into(),
154 ));
155 }
156 index.push(ServerlessSecondaryIndexEntry {
157 collection,
158 range_start,
159 range_end,
160 relative_path: PathBuf::from(take_string(bytes, &mut cursor)?),
161 offset: take_u64(bytes, &mut cursor)?,
162 bytes: take_u64(bytes, &mut cursor)?,
163 checksum: take_u32(bytes, &mut cursor)?,
164 content_hash: take_content_hash(bytes, &mut cursor)?,
165 hot: take_u8(bytes, &mut cursor)? != 0,
166 });
167 }
168 reject_trailing_bytes(bytes, cursor)?;
169 Ok(index)
170 }
171}