csv_partial_cache/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2#![doc = include_str!("../README.md")]
3
4use std::{
5    fs::{self, File},
6    io::{self, BufRead, BufReader, Seek, SeekFrom},
7    marker::PhantomData,
8    path::{Path, PathBuf},
9    time::SystemTime,
10};
11
12use serde::{Serialize, de::DeserializeOwned};
13
14/// Represents an error that can occur in this crate.
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17    /// An I/O error occurred when opening a file.
18    #[error("can't open file: {1}")]
19    OpenFile(#[source] std::io::Error, PathBuf),
20    /// An I/O error occurred when creating a file.
21    #[error("can't create file: {1}")]
22    CreateFile(#[source] std::io::Error, PathBuf),
23    /// An I/O error occurred when reading file metadata.
24    #[error("can't read file metadata: {1}")]
25    ReadFileMetadata(#[source] std::io::Error, PathBuf),
26    /// An I/O error occurred when getting the file modification time.
27    #[error("can't get file modification time: {1}")]
28    GetFileModified(#[source] std::io::Error, PathBuf),
29    /// A deserialization error occurred for a line.
30    #[error("can't deserialize line: {1}")]
31    DeserializeLine(#[source] csv::Error, String),
32    /// An I/O error occurred when reading a line.
33    #[error("can't read line {1} from {2}")]
34    ReadLine(#[source] io::Error, usize, String),
35    /// An I/O error occurred when seeking for an offset of a line.
36    #[error("can't seek for offset of line {1} in {2}")]
37    SeekOffset(#[source] io::Error, usize, String),
38    /// An offset conversion error occurred.
39    #[error("can't convert offset {0} of line {1} of {2}")]
40    IntoOffset(u64, usize, String),
41    /// An I/O error occurred when seeking in a file.
42    #[error("can't seek in {1}")]
43    Seek(#[source] io::Error, PathBuf),
44    /// An I/O error occurred when reading a line as an offset.
45    #[error("can't read line as offset {1} from {2}")]
46    ReadLineOffset(#[source] io::Error, u64, PathBuf),
47    /// A CSV error occurred.
48    #[error("can't decode csv line")]
49    Csv(#[from] csv::Error),
50    /// A CSV decoding error occurred with details.
51    #[error("can't decode csv line from `{file}` at {offset}: {line}")]
52    DecodeDetails {
53        /// The underlying cause of the error.
54        source: csv::Error,
55        /// The path to the file where the error occurred.
56        file: PathBuf,
57        /// The byte offset in the file where the error occurred.
58        offset: u64,
59        /// The content of the line where the error occurred.
60        line: String,
61    },
62    /// An error occurred when reading from the cache.
63    #[error("can't read cache from: {1}")]
64    ReadCache(#[source] serde_json::Error, PathBuf),
65    /// An error occurred when writing to the cache.
66    #[error("can't write cache into: {1}")]
67    WriteCache(#[source] serde_json::Error, PathBuf),
68}
69
70/// A `Result` alias where the `Err` case is `csv_partial_cache::Error`.
71pub type Result<T> = std::result::Result<T, Error>;
72
73/// A trait for items that can be created from a line and an offset.
74pub trait FromLineOffset: Sized {
75    /// The type of the offset.
76    type Offset: TryFrom<u64> + Into<u64>;
77
78    /// Returns the item's offset.
79    fn offset(&self) -> Self::Offset;
80    /// Creates an item from a line and an offset.
81    fn from_line_offset(line: &str, offset: Self::Offset) -> Result<Self>;
82}
83
84/// An index that stores data as a boxed slice sorted by ID.
85#[derive(Debug)]
86pub struct CsvPartialCache<T> {
87    /// The path to the CSV file.
88    pub path: PathBuf,
89    /// The items in the cache.
90    pub items: Box<[T]>,
91}
92
93/// An iterator over lines and their byte offsets.
94#[derive(Debug)]
95pub struct LineOffset<B, O> {
96    buf: B,
97    buf_name: String,
98    index: usize,
99    offset: u64,
100    _offset_type: PhantomData<O>,
101}
102
103impl<B: BufRead, O> LineOffset<B, O> {
104    /// Creates a new `LineOffset` from a buffer.
105    pub fn from_buf(name: impl Into<String>, buf: B) -> Self {
106        Self {
107            buf,
108            buf_name: name.into(),
109            index: 0,
110            offset: 0,
111            _offset_type: PhantomData,
112        }
113    }
114}
115
116impl<O> LineOffset<BufReader<File>, O> {
117    /// Creates a new `LineOffset` from a path.
118    pub fn from_path(path: &Path) -> Result<Self> {
119        let buf_name = path.to_string_lossy().to_string();
120        let f = File::open(path).map_err(|e| Error::OpenFile(e, path.into()))?;
121        let buf = BufReader::new(f);
122        Ok(Self::from_buf(buf_name, buf))
123    }
124}
125
126#[cfg(test)]
127impl<O> LineOffset<io::Cursor<Vec<u8>>, O> {
128    /// Creates a new `LineOffset` from a string.
129    pub fn from_str(name: impl Into<String>, s: &str) -> Self {
130        let buf = io::Cursor::new(s.as_bytes().to_vec());
131        Self::from_buf(name, buf)
132    }
133}
134
135impl<B, O> Iterator for LineOffset<B, O>
136where
137    B: BufRead + Seek,
138    O: TryFrom<u64>,
139{
140    type Item = Result<(String, O)>;
141
142    fn next(&mut self) -> Option<Self::Item> {
143        let mut line = String::new();
144        match self.buf.read_line(&mut line) {
145            Ok(0) => return None,
146            Ok(_n) => {
147                if line.ends_with('\n') {
148                    line.pop();
149                    if line.ends_with('\r') {
150                        line.pop();
151                    }
152                }
153            }
154            Err(e) => {
155                return Some(Err(Error::ReadLine(
156                    e,
157                    self.index,
158                    self.buf_name.to_owned(),
159                )));
160            }
161        };
162        let cur_offset = match O::try_from(self.offset) {
163            Ok(o) => o,
164            Err(_) => {
165                return Some(Err(Error::IntoOffset(
166                    self.offset,
167                    self.index,
168                    self.buf_name.to_owned(),
169                )));
170            }
171        };
172        self.index += 1;
173        self.offset = match self.buf.stream_position() {
174            Ok(o) => o,
175            Err(e) => {
176                return Some(Err(Error::SeekOffset(
177                    e,
178                    self.index,
179                    self.buf_name.to_owned(),
180                )));
181            }
182        };
183        Some(Ok((line, cur_offset)))
184    }
185}
186
187impl<T> CsvPartialCache<T>
188where
189    T: FromLineOffset,
190{
191    /// Creates a new `CsvPartialCache` from a path.
192    pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
193        let path = path.into();
194        let mut items = Vec::new();
195        let mut index = LineOffset::from_path(&path)?;
196        index.next(); // skip the header
197        for row in index {
198            let (line, offset) = row?;
199            items.push(T::from_line_offset(&line, offset)?);
200        }
201        let items = items.into_boxed_slice();
202        Ok(Self { path, items })
203    }
204
205    /// Finds an item in the cache by a key.
206    pub fn find<B, F>(&self, b: &B, f: F) -> Option<&T>
207    where
208        F: FnMut(&T) -> B,
209        B: Ord,
210    {
211        self.items
212            .binary_search_by_key(b, f)
213            .map(|index| self.items.get(index).unwrap())
214            .ok()
215    }
216
217    /// Returns a CSV line by its ID.
218    async fn details_line(&self, row: &T) -> Result<String> {
219        use tokio::{
220            fs::File,
221            io::{AsyncBufReadExt, AsyncSeekExt, BufReader},
222        };
223
224        let mut f = File::open(&self.path)
225            .await
226            .map_err(|e| Error::OpenFile(e, self.path.to_owned()))?;
227        let offset = row.offset().into();
228        f.seek(SeekFrom::Start(offset))
229            .await
230            .map_err(|e| Error::Seek(e, self.path.to_owned()))?;
231        let mut buf = BufReader::new(f);
232        let mut line = String::new();
233        buf.read_line(&mut line)
234            .await
235            .map_err(|e| Error::ReadLineOffset(e, offset, self.path.to_owned()))?;
236        Ok(line)
237    }
238
239    /// Returns the full record by its ID, deserialized into `D`.
240    pub async fn full_record<D: DeserializeOwned>(&self, row: &T) -> Result<D> {
241        let line = self.details_line(row).await?;
242        csv_line::from_str::<D>(&line).map_err(|e| Error::DecodeDetails {
243            source: e,
244            file: self.path.clone(),
245            offset: row.offset().into(),
246            line,
247        })
248    }
249}
250
251// #[cfg(feature = "cache")]
252impl<T> CsvPartialCache<T>
253where
254    T: FromLineOffset + DeserializeOwned + Serialize,
255{
256    /// Creates an index using an intermediate JSON file for faster lookups.
257    pub fn from_cache(csv_path: impl Into<PathBuf>, cache_path: impl AsRef<Path>) -> Result<Self> {
258        let csv_path = csv_path.into();
259        let cache_path = cache_path.as_ref();
260        Ok(if is_cache_expired(&csv_path, cache_path)? {
261            let index = Self::new(csv_path)?;
262            Self::items_to_cache(&index.items, cache_path)?;
263            index
264        } else {
265            let items = Self::items_from_cache(cache_path)?;
266            Self {
267                path: csv_path,
268                items,
269            }
270        })
271    }
272
273    fn items_to_cache(items: &[T], cache_path: &Path) -> Result<()> {
274        let file = File::create(cache_path).map_err(|e| Error::CreateFile(e, cache_path.into()))?;
275        serde_json::to_writer(file, items).map_err(|e| Error::WriteCache(e, cache_path.into()))?;
276        Ok(())
277    }
278
279    fn items_from_cache(cache_path: &Path) -> Result<Box<[T]>> {
280        let file = File::open(cache_path).map_err(|e| Error::OpenFile(e, cache_path.into()))?;
281        let reader = BufReader::new(file);
282        let items: Vec<T> =
283            serde_json::from_reader(reader).map_err(|e| Error::ReadCache(e, cache_path.into()))?;
284        Ok(items.into_boxed_slice())
285    }
286}
287
288/// Checks if the cache is expired by comparing the modification times of the
289/// CSV and cache files.
290fn is_cache_expired(csv_path: &Path, cache_path: &Path) -> Result<bool> {
291    if !Path::new(cache_path).exists() {
292        return Ok(true);
293    }
294    let csv_modified = file_modified_at(csv_path)?;
295    let cache_modified = file_modified_at(cache_path)?;
296    Ok(cache_modified < csv_modified)
297}
298
299fn file_modified_at(path: &Path) -> Result<SystemTime> {
300    let meta = fs::metadata(path).map_err(|e| Error::ReadFileMetadata(e, path.into()))?;
301    meta.modified()
302        .map_err(|e| Error::GetFileModified(e, path.into()))
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn iteration() {
311        let mut items = LineOffset::from_str("noname", "foo\nbar\r\nbaz");
312        assert_eq!(items.next().unwrap().unwrap(), ("foo".into(), 0));
313        assert_eq!(items.next().unwrap().unwrap(), ("bar".into(), 4));
314        assert_eq!(items.next().unwrap().unwrap(), ("baz".into(), 9));
315        assert!(items.next().is_none());
316    }
317
318    #[test]
319    fn iteration_offset_overflow() {
320        let line1 = "x".repeat(255);
321        let mut items = LineOffset::<_, u8>::from_str("noname", &format!("{line1}\nfoo"));
322        assert_eq!(items.next().unwrap().unwrap(), (line1, 0));
323        assert!(matches!(items.next().unwrap(), Err(Error::IntoOffset(..))));
324    }
325}