lance_index/
mem_wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use crate::{Index, IndexType};
5use async_trait::async_trait;
6use lance_core::cache::DeepSizeOf;
7use lance_core::Error;
8use lance_table::format::pb;
9use lance_table::rowids::segment::U64Segment;
10use prost::Message;
11use roaring::RoaringBitmap;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14use std::any::Any;
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17
18pub const MEM_WAL_INDEX_NAME: &str = "__lance_mem_wal";
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Serialize, Deserialize, DeepSizeOf)]
21pub enum State {
22    Open,
23    Sealed,
24    Flushed,
25    Merged,
26}
27
28impl From<State> for pb::mem_wal_index_details::mem_wal::State {
29    fn from(state: State) -> Self {
30        match state {
31            State::Open => Self::Open,
32            State::Sealed => Self::Sealed,
33            State::Flushed => Self::Flushed,
34            State::Merged => Self::Merged,
35        }
36    }
37}
38
39impl TryFrom<pb::mem_wal_index_details::mem_wal::State> for State {
40    type Error = Error;
41
42    fn try_from(state: pb::mem_wal_index_details::mem_wal::State) -> lance_core::Result<Self> {
43        match state {
44            pb::mem_wal_index_details::mem_wal::State::Open => Ok(Self::Open),
45            pb::mem_wal_index_details::mem_wal::State::Sealed => Ok(Self::Sealed),
46            pb::mem_wal_index_details::mem_wal::State::Flushed => Ok(Self::Flushed),
47            pb::mem_wal_index_details::mem_wal::State::Merged => Ok(Self::Merged),
48        }
49    }
50}
51
52impl TryFrom<i32> for State {
53    type Error = Error;
54
55    fn try_from(value: i32) -> lance_core::Result<Self> {
56        match value {
57            0 => Ok(Self::Open),
58            1 => Ok(Self::Sealed),
59            2 => Ok(Self::Flushed),
60            3 => Ok(Self::Merged),
61            _ => Err(Error::invalid_input(
62                format!("Unknown MemWAL state value: {}", value),
63                location!(),
64            )),
65        }
66    }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Serialize, Deserialize, DeepSizeOf)]
70pub struct MemWalId {
71    pub region: String,
72    pub generation: u64,
73}
74
75impl From<&MemWalId> for pb::mem_wal_index_details::MemWalId {
76    fn from(mem_wal: &MemWalId) -> Self {
77        Self {
78            region: mem_wal.region.clone(),
79            generation: mem_wal.generation,
80        }
81    }
82}
83
84impl TryFrom<pb::mem_wal_index_details::MemWalId> for MemWalId {
85    type Error = Error;
86
87    fn try_from(mem_wal: pb::mem_wal_index_details::MemWalId) -> lance_core::Result<Self> {
88        Ok(Self {
89            region: mem_wal.region.clone(),
90            generation: mem_wal.generation,
91        })
92    }
93}
94
95impl MemWalId {
96    pub fn new(region: &str, generation: u64) -> Self {
97        Self {
98            region: region.to_owned(),
99            generation,
100        }
101    }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Serialize, Deserialize, DeepSizeOf)]
105pub struct MemWal {
106    pub id: MemWalId,
107    pub mem_table_location: String,
108    pub wal_location: String,
109    pub wal_entries: Vec<u8>,
110    pub state: State,
111    pub owner_id: String,
112    pub last_updated_dataset_version: u64,
113}
114
115impl From<&MemWal> for pb::mem_wal_index_details::MemWal {
116    fn from(mem_wal: &MemWal) -> Self {
117        Self {
118            id: Some(pb::mem_wal_index_details::MemWalId::from(&mem_wal.id)),
119            mem_table_location: mem_wal.mem_table_location.clone(),
120            wal_location: mem_wal.wal_location.clone(),
121            wal_entries: mem_wal.wal_entries.clone(),
122            state: pb::mem_wal_index_details::mem_wal::State::from(mem_wal.state.clone()) as i32,
123            owner_id: mem_wal.owner_id.clone(),
124            last_updated_dataset_version: mem_wal.last_updated_dataset_version,
125        }
126    }
127}
128
129impl TryFrom<pb::mem_wal_index_details::MemWal> for MemWal {
130    type Error = Error;
131
132    fn try_from(mem_wal: pb::mem_wal_index_details::MemWal) -> lance_core::Result<Self> {
133        let state = State::try_from(mem_wal.state)?;
134
135        Ok(Self {
136            id: MemWalId::try_from(mem_wal.id.unwrap())?,
137            mem_table_location: mem_wal.mem_table_location.clone(),
138            wal_location: mem_wal.wal_location.clone(),
139            wal_entries: mem_wal.wal_entries,
140            state,
141            owner_id: mem_wal.owner_id,
142            last_updated_dataset_version: mem_wal.last_updated_dataset_version,
143        })
144    }
145}
146
147impl MemWal {
148    pub fn new_empty(
149        id: MemWalId,
150        mem_table_location: &str,
151        wal_location: &str,
152        owner_id: &str,
153    ) -> Self {
154        Self {
155            id,
156            mem_table_location: mem_table_location.to_owned(),
157            wal_location: wal_location.to_owned(),
158            wal_entries: pb::U64Segment::from(U64Segment::Range(0..0)).encode_to_vec(),
159            state: State::Open,
160            owner_id: owner_id.to_owned(),
161            last_updated_dataset_version: 0, // placeholder, this will be filled during build_manifest
162        }
163    }
164
165    pub fn wal_entries(&self) -> U64Segment {
166        U64Segment::try_from(pb::U64Segment::decode(self.wal_entries.as_slice()).unwrap()).unwrap()
167    }
168
169    /// Check if the MemWAL is in the expected state
170    pub fn check_state(&self, expected: State) -> lance_core::Result<()> {
171        if self.state != expected {
172            return Err(Error::invalid_input(
173                format!(
174                    "MemWAL {:?} is in state {:?}, but expected {:?}",
175                    self.id, self.state, expected
176                ),
177                location!(),
178            ));
179        }
180        Ok(())
181    }
182
183    pub fn check_expected_owner_id(&self, expected: &str) -> lance_core::Result<()> {
184        if self.owner_id != expected {
185            return Err(Error::invalid_input(
186                format!(
187                    "MemWAL {:?} has owner_id: {}, but expected {}",
188                    self.id, self.owner_id, expected
189                ),
190                location!(),
191            ));
192        }
193        Ok(())
194    }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
198pub struct MemWalIndexDetails {
199    pub mem_wal_list: Vec<MemWal>,
200}
201
202impl From<&MemWalIndexDetails> for pb::MemWalIndexDetails {
203    fn from(details: &MemWalIndexDetails) -> Self {
204        Self {
205            mem_wal_list: details.mem_wal_list.iter().map(|m| m.into()).collect(),
206        }
207    }
208}
209
210impl TryFrom<pb::MemWalIndexDetails> for MemWalIndexDetails {
211    type Error = Error;
212
213    fn try_from(details: pb::MemWalIndexDetails) -> lance_core::Result<Self> {
214        Ok(Self {
215            mem_wal_list: details
216                .mem_wal_list
217                .into_iter()
218                .map(MemWal::try_from)
219                .collect::<lance_core::Result<_>>()?,
220        })
221    }
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
225pub struct MemWalIndex {
226    pub mem_wal_map: HashMap<String, BTreeMap<u64, MemWal>>,
227}
228
229impl MemWalIndex {
230    pub fn new(details: MemWalIndexDetails) -> Self {
231        let mut mem_wal_map: HashMap<String, BTreeMap<u64, MemWal>> = HashMap::new();
232        for mem_wal in details.mem_wal_list.into_iter() {
233            if let Some(generations) = mem_wal_map.get_mut(&mem_wal.id.region) {
234                generations.insert(mem_wal.id.generation, mem_wal);
235            } else {
236                mem_wal_map.insert(
237                    mem_wal.id.region.clone(),
238                    std::iter::once((mem_wal.id.generation, mem_wal)).collect(),
239                );
240            }
241        }
242
243        Self { mem_wal_map }
244    }
245}
246
247#[derive(Serialize)]
248struct MemWalStatistics {
249    num_mem_wal: u64,
250    num_regions: u64,
251}
252
253#[async_trait]
254impl Index for MemWalIndex {
255    fn as_any(&self) -> &dyn Any {
256        self
257    }
258
259    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
260        self
261    }
262
263    fn as_vector_index(self: Arc<Self>) -> lance_core::Result<Arc<dyn crate::vector::VectorIndex>> {
264        Err(Error::NotSupported {
265            source: "FragReuseIndex is not a vector index".into(),
266            location: location!(),
267        })
268    }
269
270    fn statistics(&self) -> lance_core::Result<serde_json::Value> {
271        let stats = MemWalStatistics {
272            num_mem_wal: self.mem_wal_map.values().map(|m| m.len()).sum::<usize>() as u64,
273            num_regions: self.mem_wal_map.len() as u64,
274        };
275        serde_json::to_value(stats).map_err(|e| Error::Internal {
276            message: format!("failed to serialize MemWAL index statistics: {}", e),
277            location: location!(),
278        })
279    }
280
281    async fn prewarm(&self) -> lance_core::Result<()> {
282        Ok(())
283    }
284
285    fn index_type(&self) -> IndexType {
286        IndexType::MemWal
287    }
288
289    async fn calculate_included_frags(&self) -> lance_core::Result<RoaringBitmap> {
290        unimplemented!()
291    }
292}