1#![cfg_attr(docsrs, feature(doc_auto_cfg))]
2#![doc = include_str!("../README.md")]
3
4use std::{
5 fs::{self, File},
6 io::{self, BufRead, BufReader, Seek, SeekFrom},
7 marker::PhantomData,
8 path::{Path, PathBuf},
9 time::SystemTime,
10};
11
12use serde::{Serialize, de::DeserializeOwned};
13
14#[derive(Debug, thiserror::Error)]
16pub enum Error {
17 #[error("can't open file: {1}")]
19 OpenFile(#[source] std::io::Error, PathBuf),
20 #[error("can't create file: {1}")]
22 CreateFile(#[source] std::io::Error, PathBuf),
23 #[error("can't read file metadata: {1}")]
25 ReadFileMetadata(#[source] std::io::Error, PathBuf),
26 #[error("can't get file modification time: {1}")]
28 GetFileModified(#[source] std::io::Error, PathBuf),
29 #[error("can't deserialize line: {1}")]
31 DeserializeLine(#[source] csv::Error, String),
32 #[error("can't read line {1} from {2}")]
34 ReadLine(#[source] io::Error, usize, String),
35 #[error("can't seek for offset of line {1} in {2}")]
37 SeekOffset(#[source] io::Error, usize, String),
38 #[error("can't convert offset {0} of line {1} of {2}")]
40 IntoOffset(u64, usize, String),
41 #[error("can't seek in {1}")]
43 Seek(#[source] io::Error, PathBuf),
44 #[error("can't read line as offset {1} from {2}")]
46 ReadLineOffset(#[source] io::Error, u64, PathBuf),
47 #[error("can't decode csv line")]
49 Csv(#[from] csv::Error),
50 #[error("can't decode csv line from `{file}` at {offset}: {line}")]
52 DecodeDetails {
53 source: csv::Error,
55 file: PathBuf,
57 offset: u64,
59 line: String,
61 },
62 #[error("can't read cache from: {1}")]
64 ReadCache(#[source] serde_json::Error, PathBuf),
65 #[error("can't write cache into: {1}")]
67 WriteCache(#[source] serde_json::Error, PathBuf),
68}
69
70pub type Result<T> = std::result::Result<T, Error>;
72
73pub trait FromLineOffset: Sized {
75 type Offset: TryFrom<u64> + Into<u64>;
77
78 fn offset(&self) -> Self::Offset;
80 fn from_line_offset(line: &str, offset: Self::Offset) -> Result<Self>;
82}
83
84#[derive(Debug)]
86pub struct CsvPartialCache<T> {
87 pub path: PathBuf,
89 pub items: Box<[T]>,
91}
92
93#[derive(Debug)]
95pub struct LineOffset<B, O> {
96 buf: B,
97 buf_name: String,
98 index: usize,
99 offset: u64,
100 _offset_type: PhantomData<O>,
101}
102
103impl<B: BufRead, O> LineOffset<B, O> {
104 pub fn from_buf(name: impl Into<String>, buf: B) -> Self {
106 Self {
107 buf,
108 buf_name: name.into(),
109 index: 0,
110 offset: 0,
111 _offset_type: PhantomData,
112 }
113 }
114}
115
116impl<O> LineOffset<BufReader<File>, O> {
117 pub fn from_path(path: &Path) -> Result<Self> {
119 let buf_name = path.to_string_lossy().to_string();
120 let f = File::open(path).map_err(|e| Error::OpenFile(e, path.into()))?;
121 let buf = BufReader::new(f);
122 Ok(Self::from_buf(buf_name, buf))
123 }
124}
125
126#[cfg(test)]
127impl<O> LineOffset<io::Cursor<Vec<u8>>, O> {
128 pub fn from_str(name: impl Into<String>, s: &str) -> Self {
130 let buf = io::Cursor::new(s.as_bytes().to_vec());
131 Self::from_buf(name, buf)
132 }
133}
134
135impl<B, O> Iterator for LineOffset<B, O>
136where
137 B: BufRead + Seek,
138 O: TryFrom<u64>,
139{
140 type Item = Result<(String, O)>;
141
142 fn next(&mut self) -> Option<Self::Item> {
143 let mut line = String::new();
144 match self.buf.read_line(&mut line) {
145 Ok(0) => return None,
146 Ok(_n) => {
147 if line.ends_with('\n') {
148 line.pop();
149 if line.ends_with('\r') {
150 line.pop();
151 }
152 }
153 }
154 Err(e) => {
155 return Some(Err(Error::ReadLine(
156 e,
157 self.index,
158 self.buf_name.to_owned(),
159 )));
160 }
161 };
162 let cur_offset = match O::try_from(self.offset) {
163 Ok(o) => o,
164 Err(_) => {
165 return Some(Err(Error::IntoOffset(
166 self.offset,
167 self.index,
168 self.buf_name.to_owned(),
169 )));
170 }
171 };
172 self.index += 1;
173 self.offset = match self.buf.stream_position() {
174 Ok(o) => o,
175 Err(e) => {
176 return Some(Err(Error::SeekOffset(
177 e,
178 self.index,
179 self.buf_name.to_owned(),
180 )));
181 }
182 };
183 Some(Ok((line, cur_offset)))
184 }
185}
186
187impl<T> CsvPartialCache<T>
188where
189 T: FromLineOffset,
190{
191 pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
193 let path = path.into();
194 let mut items = Vec::new();
195 let mut index = LineOffset::from_path(&path)?;
196 index.next(); for row in index {
198 let (line, offset) = row?;
199 items.push(T::from_line_offset(&line, offset)?);
200 }
201 let items = items.into_boxed_slice();
202 Ok(Self { path, items })
203 }
204
205 pub fn find<B, F>(&self, b: &B, f: F) -> Option<&T>
207 where
208 F: FnMut(&T) -> B,
209 B: Ord,
210 {
211 self.items
212 .binary_search_by_key(b, f)
213 .map(|index| self.items.get(index).unwrap())
214 .ok()
215 }
216
217 async fn details_line(&self, row: &T) -> Result<String> {
219 use tokio::{
220 fs::File,
221 io::{AsyncBufReadExt, AsyncSeekExt, BufReader},
222 };
223
224 let mut f = File::open(&self.path)
225 .await
226 .map_err(|e| Error::OpenFile(e, self.path.to_owned()))?;
227 let offset = row.offset().into();
228 f.seek(SeekFrom::Start(offset))
229 .await
230 .map_err(|e| Error::Seek(e, self.path.to_owned()))?;
231 let mut buf = BufReader::new(f);
232 let mut line = String::new();
233 buf.read_line(&mut line)
234 .await
235 .map_err(|e| Error::ReadLineOffset(e, offset, self.path.to_owned()))?;
236 Ok(line)
237 }
238
239 pub async fn full_record<D: DeserializeOwned>(&self, row: &T) -> Result<D> {
241 let line = self.details_line(row).await?;
242 csv_line::from_str::<D>(&line).map_err(|e| Error::DecodeDetails {
243 source: e,
244 file: self.path.clone(),
245 offset: row.offset().into(),
246 line,
247 })
248 }
249}
250
251impl<T> CsvPartialCache<T>
253where
254 T: FromLineOffset + DeserializeOwned + Serialize,
255{
256 pub fn from_cache(csv_path: impl Into<PathBuf>, cache_path: impl AsRef<Path>) -> Result<Self> {
258 let csv_path = csv_path.into();
259 let cache_path = cache_path.as_ref();
260 Ok(if is_cache_expired(&csv_path, cache_path)? {
261 let index = Self::new(csv_path)?;
262 Self::items_to_cache(&index.items, cache_path)?;
263 index
264 } else {
265 let items = Self::items_from_cache(cache_path)?;
266 Self {
267 path: csv_path,
268 items,
269 }
270 })
271 }
272
273 fn items_to_cache(items: &[T], cache_path: &Path) -> Result<()> {
274 let file = File::create(cache_path).map_err(|e| Error::CreateFile(e, cache_path.into()))?;
275 serde_json::to_writer(file, items).map_err(|e| Error::WriteCache(e, cache_path.into()))?;
276 Ok(())
277 }
278
279 fn items_from_cache(cache_path: &Path) -> Result<Box<[T]>> {
280 let file = File::open(cache_path).map_err(|e| Error::OpenFile(e, cache_path.into()))?;
281 let reader = BufReader::new(file);
282 let items: Vec<T> =
283 serde_json::from_reader(reader).map_err(|e| Error::ReadCache(e, cache_path.into()))?;
284 Ok(items.into_boxed_slice())
285 }
286}
287
288fn is_cache_expired(csv_path: &Path, cache_path: &Path) -> Result<bool> {
291 if !Path::new(cache_path).exists() {
292 return Ok(true);
293 }
294 let csv_modified = file_modified_at(csv_path)?;
295 let cache_modified = file_modified_at(cache_path)?;
296 Ok(cache_modified < csv_modified)
297}
298
299fn file_modified_at(path: &Path) -> Result<SystemTime> {
300 let meta = fs::metadata(path).map_err(|e| Error::ReadFileMetadata(e, path.into()))?;
301 meta.modified()
302 .map_err(|e| Error::GetFileModified(e, path.into()))
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308
309 #[test]
310 fn iteration() {
311 let mut items = LineOffset::from_str("noname", "foo\nbar\r\nbaz");
312 assert_eq!(items.next().unwrap().unwrap(), ("foo".into(), 0));
313 assert_eq!(items.next().unwrap().unwrap(), ("bar".into(), 4));
314 assert_eq!(items.next().unwrap().unwrap(), ("baz".into(), 9));
315 assert!(items.next().is_none());
316 }
317
318 #[test]
319 fn iteration_offset_overflow() {
320 let line1 = "x".repeat(255);
321 let mut items = LineOffset::<_, u8>::from_str("noname", &format!("{line1}\nfoo"));
322 assert_eq!(items.next().unwrap().unwrap(), (line1, 0));
323 assert!(matches!(items.next().unwrap(), Err(Error::IntoOffset(..))));
324 }
325}