1use crate::{Index, IndexType};
5use async_trait::async_trait;
6use lance_core::cache::DeepSizeOf;
7use lance_core::Error;
8use lance_table::format::pb;
9use lance_table::rowids::segment::U64Segment;
10use prost::Message;
11use roaring::RoaringBitmap;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Serialize, Deserialize, DeepSizeOf)]
21pub enum State {
22 Open,
23 Sealed,
24 Flushed,
25 Merged,
26}
27
28impl From<State> for pb::mem_wal_index_details::mem_wal::State {
29 fn from(state: State) -> Self {
30 match state {
31 State::Open => Self::Open,
32 State::Sealed => Self::Sealed,
33 State::Flushed => Self::Flushed,
34 State::Merged => Self::Merged,
35 }
36 }
37}
38
39impl TryFrom<pb::mem_wal_index_details::mem_wal::State> for State {
40 type Error = Error;
41
42 fn try_from(state: pb::mem_wal_index_details::mem_wal::State) -> lance_core::Result<Self> {
43 match state {
44 pb::mem_wal_index_details::mem_wal::State::Open => Ok(Self::Open),
45 pb::mem_wal_index_details::mem_wal::State::Sealed => Ok(Self::Sealed),
46 pb::mem_wal_index_details::mem_wal::State::Flushed => Ok(Self::Flushed),
47 pb::mem_wal_index_details::mem_wal::State::Merged => Ok(Self::Merged),
48 }
49 }
50}
51
52impl TryFrom<i32> for State {
53 type Error = Error;
54
55 fn try_from(value: i32) -> lance_core::Result<Self> {
56 match value {
57 0 => Ok(Self::Open),
58 1 => Ok(Self::Sealed),
59 2 => Ok(Self::Flushed),
60 3 => Ok(Self::Merged),
61 _ => Err(Error::invalid_input(
62 format!("Unknown MemWAL state value: {}", value),
63 location!(),
64 )),
65 }
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Serialize, Deserialize, DeepSizeOf)]
70pub struct MemWalId {
71 pub region: String,
72 pub generation: u64,
73}
74
75impl From<&MemWalId> for pb::mem_wal_index_details::MemWalId {
76 fn from(mem_wal: &MemWalId) -> Self {
77 Self {
78 region: mem_wal.region.clone(),
79 generation: mem_wal.generation,
80 }
81 }
82}
83
84impl TryFrom<pb::mem_wal_index_details::MemWalId> for MemWalId {
85 type Error = Error;
86
87 fn try_from(mem_wal: pb::mem_wal_index_details::MemWalId) -> lance_core::Result<Self> {
88 Ok(Self {
89 region: mem_wal.region.clone(),
90 generation: mem_wal.generation,
91 })
92 }
93}
94
95impl MemWalId {
96 pub fn new(region: &str, generation: u64) -> Self {
97 Self {
98 region: region.to_owned(),
99 generation,
100 }
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Serialize, Deserialize, DeepSizeOf)]
105pub struct MemWal {
106 pub id: MemWalId,
107 pub mem_table_location: String,
108 pub wal_location: String,
109 pub wal_entries: Vec<u8>,
110 pub state: State,
111 pub owner_id: String,
112 pub last_updated_dataset_version: u64,
113}
114
115impl From<&MemWal> for pb::mem_wal_index_details::MemWal {
116 fn from(mem_wal: &MemWal) -> Self {
117 Self {
118 id: Some(pb::mem_wal_index_details::MemWalId::from(&mem_wal.id)),
119 mem_table_location: mem_wal.mem_table_location.clone(),
120 wal_location: mem_wal.wal_location.clone(),
121 wal_entries: mem_wal.wal_entries.clone(),
122 state: pb::mem_wal_index_details::mem_wal::State::from(mem_wal.state.clone()) as i32,
123 owner_id: mem_wal.owner_id.clone(),
124 last_updated_dataset_version: mem_wal.last_updated_dataset_version,
125 }
126 }
127}
128
129impl TryFrom<pb::mem_wal_index_details::MemWal> for MemWal {
130 type Error = Error;
131
132 fn try_from(mem_wal: pb::mem_wal_index_details::MemWal) -> lance_core::Result<Self> {
133 let state = State::try_from(mem_wal.state)?;
134
135 Ok(Self {
136 id: MemWalId::try_from(mem_wal.id.unwrap())?,
137 mem_table_location: mem_wal.mem_table_location.clone(),
138 wal_location: mem_wal.wal_location.clone(),
139 wal_entries: mem_wal.wal_entries,
140 state,
141 owner_id: mem_wal.owner_id,
142 last_updated_dataset_version: mem_wal.last_updated_dataset_version,
143 })
144 }
145}
146
147impl MemWal {
148 pub fn new_empty(
149 id: MemWalId,
150 mem_table_location: &str,
151 wal_location: &str,
152 owner_id: &str,
153 ) -> Self {
154 Self {
155 id,
156 mem_table_location: mem_table_location.to_owned(),
157 wal_location: wal_location.to_owned(),
158 wal_entries: pb::U64Segment::from(U64Segment::Range(0..0)).encode_to_vec(),
159 state: State::Open,
160 owner_id: owner_id.to_owned(),
161 last_updated_dataset_version: 0, }
163 }
164
165 pub fn wal_entries(&self) -> U64Segment {
166 U64Segment::try_from(pb::U64Segment::decode(self.wal_entries.as_slice()).unwrap()).unwrap()
167 }
168
169 pub fn check_state(&self, expected: State) -> lance_core::Result<()> {
171 if self.state != expected {
172 return Err(Error::invalid_input(
173 format!(
174 "MemWAL {:?} is in state {:?}, but expected {:?}",
175 self.id, self.state, expected
176 ),
177 location!(),
178 ));
179 }
180 Ok(())
181 }
182
183 pub fn check_expected_owner_id(&self, expected: &str) -> lance_core::Result<()> {
184 if self.owner_id != expected {
185 return Err(Error::invalid_input(
186 format!(
187 "MemWAL {:?} has owner_id: {}, but expected {}",
188 self.id, self.owner_id, expected
189 ),
190 location!(),
191 ));
192 }
193 Ok(())
194 }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
198pub struct MemWalIndexDetails {
199 pub mem_wal_list: Vec<MemWal>,
200}
201
202impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
203 fn from(details: &MemWalIndexDetails) -> Self {
204 Self {
205 mem_wal_list: details.mem_wal_list.iter().map(|m| m.into()).collect(),
206 }
207 }
208}
209
210impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
211 type Error = Error;
212
213 fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
214 Ok(Self {
215 mem_wal_list: details
216 .mem_wal_list
217 .into_iter()
218 .map(MemWal::try_from)
219 .collect::<lance_core::Result<_>>()?,
220 })
221 }
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
225pub struct MemWalIndex {
226 pub mem_wal_map: HashMap<String, BTreeMap<u64, MemWal>>,
227}
228
229impl MemWalIndex {
230 pub fn new(details: MemWalIndexDetails) -> Self {
231 let mut mem_wal_map: HashMap<String, BTreeMap<u64, MemWal>> = HashMap::new();
232 for mem_wal in details.mem_wal_list.into_iter() {
233 if let Some(generations) = mem_wal_map.get_mut(&mem_wal.id.region) {
234 generations.insert(mem_wal.id.generation, mem_wal);
235 } else {
236 mem_wal_map.insert(
237 mem_wal.id.region.clone(),
238 std::iter::once((mem_wal.id.generation, mem_wal)).collect(),
239 );
240 }
241 }
242
243 Self { mem_wal_map }
244 }
245}
246
247#[derive(Serialize)]
248struct MemWalStatistics {
249 num_mem_wal: u64,
250 num_regions: u64,
251}
252
253#[async_trait]
254impl Index for MemWalIndex {
255 fn as_any(&self) -> &dyn Any {
256 self
257 }
258
259 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
260 self
261 }
262
263 fn as_vector_index(self: Arc<Self>) -> lance_core::Result<Arc<dyn crate::vector::VectorIndex>> {
264 Err(Error::NotSupported {
265 source: "FragReuseIndex is not a vector index".into(),
266 location: location!(),
267 })
268 }
269
270 fn statistics(&self) -> lance_core::Result<serde_json::Value> {
271 let stats = MemWalStatistics {
272 num_mem_wal: self.mem_wal_map.values().map(|m| m.len()).sum::<usize>() as u64,
273 num_regions: self.mem_wal_map.len() as u64,
274 };
275 serde_json::to_value(stats).map_err(|e| Error::Internal {
276 message: format!("failed to serialize MemWAL index statistics: {}", e),
277 location: location!(),
278 })
279 }
280
281 async fn prewarm(&self) -> lance_core::Result<()> {
282 Ok(())
283 }
284
285 fn index_type(&self) -> IndexType {
286 IndexType::MemWal
287 }
288
289 async fn calculate_included_frags(&self) -> lance_core::Result<RoaringBitmap> {
290 unimplemented!()
291 }
292}