#[cfg(unix)] use std::os::fd::RawFd;
use core::cell::RefCell;
use core::mem::MaybeUninit;
use crate::fail;
use crate::re::Re;
use fomat_macros::{fomat, pintln};
use memchr::{memchr, memrchr};
use memmap2::{Mmap, MmapOptions, MmapMut};
use std::ffi;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::ptr::null_mut;
use std::str::from_utf8_unchecked;
pub struct LinesIt<'a> {
pub lines: &'a [u8],
pub head: usize,
pub tail: usize}
impl<'a> LinesIt<'a> {
pub fn new (lines: &'a [u8]) -> LinesIt<'a> {
let (mut head, mut tail) = (0, lines.len());
loop {
if tail <= head {break}
if lines[head] == b'\n' {head += 1; continue}
break}
loop {
if tail <= head {break}
if lines[tail-1] == b'\n' {tail -= 1; continue}
break}
LinesIt {lines, head, tail}}
pub fn heads_up (lines: &'a [u8], pos: usize) -> LinesIt<'a> {
let len = lines.len();
if len < pos {
LinesIt {lines, head: len, tail: len}
} else {
LinesIt {lines,
head: memrchr (b'\n', &lines[..pos]) .unwrap_or_default(),
tail: len}}}}
impl<'a> Iterator for LinesIt<'a> {
type Item = &'a [u8];
fn next (&mut self) -> Option<Self::Item> {
loop {
if self.tail <= self.head {return None}
if self.lines[self.head] == b'\n' {self.head += 1; continue}
break}
if let Some (mut lf) = memchr (b'\n', &self.lines[self.head .. self.tail]) {
lf += self.head;
let line = &self.lines[self.head .. lf];
self.head = lf + 1;
Some (line)
} else {
let line = &self.lines[self.head .. self.tail];
self.head = self.tail;
Some (line)}}}
impl<'a> DoubleEndedIterator for LinesIt<'a> {
fn next_back (&mut self) -> Option<Self::Item> {
loop {
if self.tail <= self.head {return None}
if self.lines[self.tail-1] == b'\n' {self.tail -= 1; continue}
break}
if let Some (mut lf) = memrchr (b'\n', &self.lines[self.head .. self.tail]) {
lf += self.head;
let line = &self.lines[lf + 1 .. self.tail];
self.tail = lf;
Some (line)
} else {
let line = &self.lines[self.head .. self.tail];
self.tail = self.head;
Some (line)}}}
#[cfg(not(windows))]
pub struct Lock {fd: i32}
#[cfg(windows)]
pub struct Lock {handle: std::os::windows::io::RawHandle}
unsafe impl Send for Lock {}
unsafe impl Sync for Lock {}
#[cfg(windows)]
pub fn lock (file: &fs::File, ex: bool) -> Result<Lock, u32> {
use std::os::windows::io::AsRawHandle;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::fileapi::LockFileEx;
use winapi::um::minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY};
let mut flags = LOCKFILE_FAIL_IMMEDIATELY;
if ex {flags |= LOCKFILE_EXCLUSIVE_LOCK}
unsafe {
let mut overlapped = MaybeUninit::zeroed().assume_init();
let handle = file.as_raw_handle();
let rc = LockFileEx (handle, flags, 0, 0, 0, &mut overlapped);
if rc == 0 {
Err (match GetLastError() {0 => 33, errno => errno})
} else {Ok (Lock {handle})}}}
impl Drop for Lock {
#[cfg(windows)]
fn drop (&mut self) {
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::fileapi::UnlockFileEx;
unsafe {
let mut overlapped = MaybeUninit::zeroed().assume_init();
let rc = UnlockFileEx (self.handle, 0, 0, 0, &mut overlapped);
if rc == 0 {}}}
#[cfg(not(windows))]
fn drop (&mut self) {
unsafe {
let rc = libc::flock (self.fd, libc::LOCK_UN);
if rc == -1 {let _errno = *libc::__errno_location();}}}}
#[cfg(not(windows))]
pub fn lock (file: &fs::File, ex: bool) -> Result<Lock, u32> {
use std::os::unix::io::AsRawFd;
let mut flags = libc::LOCK_NB;
if ex {flags |= libc::LOCK_EX} else {flags |= libc::LOCK_SH}
unsafe {
let fd = file.as_raw_fd();
let rc = libc::flock (fd, flags);
if rc == -1 {Err (*libc::__errno_location() as u32)}
else {Ok (Lock {fd})}}}
pub struct LockAndLoad {
pub header: &'static [u8],
pub lock: Lock,
pub mmap: Mmap,
pub file: fs::File}
impl LockAndLoad {
pub fn open (path: &dyn AsRef<Path>, ex: bool, header: &'static [u8]) -> Re<LockAndLoad> {
let mut oop = fs::OpenOptions::new();
oop.read (true);
if ex {oop.write (true) .create (true);}
let mut file = oop.open (path.as_ref())?;
let lock = lock (&file, ex)?;
let mut mmap = unsafe {MmapOptions::new().map (&file)?};
if !header.is_empty() {
if ex && mmap.is_empty() {
file.write_all (header)?;
mmap = unsafe {MmapOptions::new().map (&file)?}}
if mmap.len() < header.len() || &mmap[..header.len()] != header {
fail! ([path.as_ref()] ": unexpected header")}}
Re::Ok (LockAndLoad {header, lock, mmap, file})}
pub fn ex (path: &dyn AsRef<Path>, header: &'static [u8]) -> Re<LockAndLoad> {
LockAndLoad::open (path, true, header)}
pub fn rd (path: &dyn AsRef<Path>, header: &'static [u8]) -> Re<LockAndLoad> {
LockAndLoad::open (path, false, header)}
pub fn bulk (&self) -> &[u8] {
&self.mmap[self.header.len()..]}
pub fn lines (&self) -> impl Iterator<Item=&[u8]> {
self.bulk().split (|ch| *ch == b'\n') .filter (|l| !l.is_empty())}
pub fn iter (&self) -> LinesIt {
let bulk = self.bulk();
LinesIt {lines: bulk, head: 0, tail: bulk.len()}}
pub fn heads_up (&self, pos: usize) -> LinesIt {
LinesIt::heads_up (self.bulk(), pos)}}
pub fn csesct<P> (fr: &[u8], mut push: P) where P: FnMut (u8) {
for &ch in fr.iter() {
if ch == 1 {push (1); push (1)}
else if ch == 9 {push (1); push (7)}
else if ch == 10 {push (1); push (3)}
else if ch == 13 {push (1); push (4)}
else if ch == 34 {push (1); push (5)}
else {push (ch)}}}
pub fn csesc0<P> (fr: &[u8], mut push: P) where P: FnMut (u8) {
for &ch in fr.iter() {
if ch == 1 {push (1); push (1)}
else if ch == 10 {push (1); push (3)}
else if ch == 13 {push (1); push (4)}
else if ch == 34 {push (1); push (5)}
else if ch == 44 {push (1); push (6)}
else {push (ch)}}}
pub fn csesc<P> (fr: &[u8], mut push: P) where P: FnMut (u8) {
for &ch in fr.iter() {
if ch == 1 {push (1); push (1)}
else if ch == 0 {push (1); push (2)}
else if ch == 10 {push (1); push (3)}
else if ch == 13 {push (1); push (4)}
else if ch == 34 {push (1); push (5)}
else if ch == 44 {push (1); push (6)}
else {push (ch)}}}
pub fn csunesc<P> (fr: &[u8], mut push: P) where P: FnMut (u8) {
let len = fr.len();
let mut ix = 0;
loop {
if ix == len {break}
let code = fr[ix];
ix += 1;
if code == 1 && ix != len {
let esc = fr[ix];
ix += 1;
if esc == 1 {push (1)}
else if esc == 2 {push (0)}
else if esc == 3 {push (10)}
else if esc == 4 {push (13)}
else if esc == 5 {push (34)}
else if esc == 6 {push (44)}
else if esc == 7 {push (9)}
} else {push (code)}}}
#[cfg(all(test, feature = "nightly"))] mod test {
extern crate test;
use fomat_macros::pintln;
use super::*;
const JSON_LINES: &'static str = concat! (
r#"{"foo": 1}"#, '\n',
r#"{"bar": 2}"#, '\n');
const CSV: &'static str = concat! (
"foo,bar\n",
"foo,1\n",
"\n\n", "bar,2");
#[test] fn back() {
let mut it = LinesIt::new (JSON_LINES.as_bytes());
assert_eq! (it.next_back().unwrap(), br#"{"bar": 2}"#);
assert_eq! (it.next_back().unwrap(), br#"{"foo": 1}"#);
assert_eq! (it.next_back(), None);
assert_eq! (it.next(), None);
it = LinesIt::new (CSV.as_bytes());
assert_eq! (it.next_back().unwrap(), b"bar,2");
assert_eq! (it.next_back().unwrap(), b"foo,1");
assert_eq! (it.next_back().unwrap(), b"foo,bar");
assert_eq! (it.next_back(), None);
assert_eq! (it.next(), None);}
#[test] fn forward() {
let mut it = LinesIt::new (JSON_LINES.as_bytes());
assert_eq! (it.next().unwrap(), br#"{"foo": 1}"#);
assert_eq! (it.next().unwrap(), br#"{"bar": 2}"#);
assert_eq! (it.next(), None);
assert_eq! (it.next_back(), None);
it = LinesIt::new (CSV.as_bytes());
assert_eq! (it.next().unwrap(), b"foo,bar");
assert_eq! (it.next().unwrap(), b"foo,1");
assert_eq! (it.next().unwrap(), b"bar,2");
assert_eq! (it.next(), None);
assert_eq! (it.next_back(), None)}
#[test] fn meet() {
let mut it = LinesIt::new (JSON_LINES.as_bytes());
assert_eq! (it.next().unwrap(), br#"{"foo": 1}"#);
assert_eq! (it.next_back().unwrap(), br#"{"bar": 2}"#);
assert_eq! (it.next_back(), None);
assert_eq! (it.next(), None);
it = LinesIt::new (& CSV.as_bytes() [8..]);
assert_eq! (it.next().unwrap(), b"foo,1");
assert_eq! (it.next_back().unwrap(), b"bar,2");
assert_eq! (it.next_back(), None);
assert_eq! (it.next(), None)}
#[bench] fn seek (bm: &mut test::Bencher) {
let mut ix = 0;
bm.iter (|| {
let mut it = LinesIt::heads_up (CSV.as_bytes(), ix);
let line = it.next().unwrap();
let expected = match ix {
0 ..= 7 => b"foo,bar" as &[u8],
8 ..= 13 => b"foo,1",
14 ..= 33 => b"bar,2",
_ => unreachable!()};
assert_eq! (line, expected);
ix += 1;
if it.lines.len() <= ix {ix = 0}})}}
#[cfg(feature = "sqlite")] pub mod sq {
use core::cell::UnsafeCell;
use core::mem::{transmute, ManuallyDrop};
use crate::re::Re;
use fomat_macros::fomat;
use reffers::rc1::Strong as Strong1;
use reffers::rc2::Strong as Strong2;
use rusqlite::{CachedStatement, Connection, Rows};
pub fn sqwal (schema: &str) -> String {fomat! (
"PRAGMA " (schema) ".journal_mode = WAL;"
"PRAGMA " (schema) ".journal_size_limit = 1048576;" "PRAGMA wal_autocheckpoint = 0;")}
pub fn sqtune (schema: &str) -> String {fomat! (
"PRAGMA " (schema) ".page_size = 16384;"
"PRAGMA " (schema) ".auto_vacuum = INCREMENTAL;" "PRAGMA " (schema) ".synchronous = NORMAL;"
"PRAGMA temp_store = MEMORY;")}
pub struct SqRows {
rows: ManuallyDrop<UnsafeCell<Rows<'static>>>, sth: ManuallyDrop<Strong1<CachedStatement<'static>>>,
db: ManuallyDrop<Strong2<Connection>>}
impl SqRows {
pub fn new (rows: Rows<'static>, sth: Strong1<CachedStatement<'static>>, db: Strong2<Connection>) -> SqRows {SqRows {
rows: ManuallyDrop::new (UnsafeCell::new (rows)),
sth: ManuallyDrop::new (sth),
db: ManuallyDrop::new (db)}}
pub fn r<'a> (&'a self) -> &mut Rows<'a> {
unsafe {transmute (&mut *self.rows.get())}}
pub fn z32 (&self, column: usize) -> Re<i32> {
let Some (row) = self.r().next()? else {return Re::Ok (0)};
Re::Ok (row.get::<_, Option<i32>> (column)? .unwrap_or (0))}
pub fn z64 (&self, column: usize) -> Re<i64> {
let Some (row) = self.r().next()? else {return Re::Ok (0)};
Re::Ok (row.get::<_, Option<i64>> (column)? .unwrap_or (0))}
pub fn f32 (&self, column: usize) -> Re<f32> {
let Some (row) = self.r().next()? else {return Re::Ok (f32::NAN)};
Re::Ok (row.get::<_, Option<f32>> (column)? .unwrap_or (f32::NAN))}}
impl Drop for SqRows {
fn drop (&mut self) {unsafe {
ManuallyDrop::drop (&mut self.rows); ManuallyDrop::drop (&mut self.sth); ManuallyDrop::drop (&mut self.db)}}}
#[macro_export] macro_rules! sq {
($db: expr, $params: expr, $($fq: tt)+) => {{
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
fomat_macros::wite! (&mut buf, $($fq)+)?;
let mut sth = reffers::rc1::RefMut::new ($db.prepare_cached (b2s (&buf))?);
let rows = sth.query ($params)?;
unsafe {$crate::lines::sq::SqRows::new (
core::mem::transmute (rows),
core::mem::transmute (sth.get_strong()),
$db.try_get_strong()?)}}}}
#[macro_export] macro_rules! se {
($db: expr, $params: expr, $($fq: tt)+) => {{
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
fomat_macros::wite! (&mut buf, $($fq)+)?;
let mut sth = $db.prepare_cached (b2s (&buf))?;
sth.execute ($params)?}};
(e $expect: expr, $db: expr, $params: expr, $($fq: tt)+) => {{
let expect = $expect as usize;
let ups = se! ($db, $params, $($fq)+);
if ups != expect {fail! ("ups " (ups) " <> " (expect) " expect")}}}}
}
#[cfg(feature = "sqlite")] pub mod csq {
use core::ffi::c_int;
use core::marker::PhantomData;
use core::str::from_utf8;
use crate::lines::Re;
use crate::log;
use fomat_macros::{fomat, wite};
use rusqlite::{vtab, Connection, Error, Result};
use rusqlite::ffi::{sqlite3_vtab, sqlite3_vtab_cursor};
use rusqlite::types::Value;
use smallvec::SmallVec;
use std::fmt::Write;
use std::rc::Rc;
use super::{LockAndLoad, LinesIt};
#[repr(C)]
pub struct CsqVTab {
base: sqlite3_vtab,
ll: LockAndLoad}
#[repr(C)]
pub struct CsvVTabCursor<'vt> {
base: sqlite3_vtab_cursor,
it: LinesIt<'vt>,
cols: SmallVec<[&'vt [u8]; 8]>,
rowid: usize,
eof: bool,
phantom: PhantomData<&'vt CsqVTab>}
unsafe impl vtab::VTabCursor for CsvVTabCursor<'_> {
fn filter (&mut self, _idx_num: c_int, _idx_str: Option<&str>, _args: &vtab::Values<'_>) -> Result<()> {
self.it.head = 0;
self.rowid = 0;
self.eof = false;
self.next()}
fn next (&mut self) -> Result<()> {
if 0 == self.rowid { if self.it.next().is_none() {self.eof = true}}
if let Some (row) = self.it.next() {
self.cols.clear();
for col in row.split (|ch| *ch == b',') {self.cols.push (col)}
} else {self.eof = true}
if self.eof {return Ok(())}
self.rowid += 1;
Ok(())}
fn eof (&self) -> bool {
self.eof}
fn column (&self, ctx: &mut vtab::Context, col: c_int) -> Result<()> {
if col < 0 || self.cols.len() as c_int <= col {return Err (Error::ModuleError (fomat! ("csq] " [=col])))}
let col = self.cols[col as usize];
if let Ok (ustr) = from_utf8 (col) {
ctx.set_result (&ustr)
} else {
ctx.set_result (&col)}}
fn rowid (&self) -> Result<i64> {
Ok (self.rowid as i64)}}
unsafe impl<'vtab> vtab::VTab<'vtab> for CsqVTab {
type Aux = ();
type Cursor = CsvVTabCursor<'vtab>;
fn connect (db: &mut vtab::VTabConnection, _aux: Option<&()>, args: &[&[u8]]) -> Result<(String, CsqVTab)> {
if args.len() < 4 {return Err (Error::ModuleError ("csq] !path".into()))}
let mut ll = None;
let argsʹ = &args[3..];
for c_slice in argsʹ {
let (param, value) = vtab::parameter (c_slice)?;
match param {
"path" => ll = Some (match LockAndLoad::rd (&value, b"") {Re::Ok (k) => k, Re::Err (err) => {
return Err (Error::ModuleError (fomat! ("csq] " (err))))}}),
_ => return Err (Error::ModuleError (fomat! ("csq] unrecognized " [=param])))}}
let Some (ll) = ll else {return Err (Error::ModuleError ("csq] !path".into()))};
let Some (hdr) = ll.lines().next() else {return Err (Error::ModuleError ("csq] !head".into()))};
let Ok (tname) = from_utf8 (&args[2]) else {return Err (Error::ModuleError ("csq] tname!utf8".into()))};
let tname = vtab::escape_double_quote (tname.trim());
let mut schema = String::with_capacity (123);
let _ = wite! (&mut schema, "CREATE TABLE \"" (tname) "\" (");
for (col, cn) in hdr.split (|ch| *ch == b',') .zip (0..) {
let Ok (col) = from_utf8 (col) else {return Err (Error::ModuleError ("csq] head!utf8".into()))};
let col = vtab::escape_double_quote (col);
let _ = wite! (&mut schema, if cn != 0 {", "} '"' (col) "\" TEXT NOT NULL");}
schema.push_str (");");
let vtab = CsqVTab {base: sqlite3_vtab::default(), ll};
db.config (vtab::VTabConfig::DirectOnly)?;
Ok ((schema, vtab))}
fn best_index (&self, info: &mut vtab::IndexInfo) -> Result<()> {
info.set_estimated_cost (1_000_000.);
Ok(())}
fn open (&mut self) -> Result<CsvVTabCursor<'_>> {
Ok (CsvVTabCursor {
base: sqlite3_vtab_cursor::default(),
it: self.ll.iter(),
cols: SmallVec::new(),
rowid: 0,
eof: false,
phantom: PhantomData})}}
impl vtab::CreateVTab<'_> for CsqVTab {
const KIND: vtab::VTabKind = vtab::VTabKind::Default;}
pub fn csq_load (db: &Connection) -> Re<()> {
db.create_module ("csq", vtab::read_only_module::<CsqVTab>(), None)?;
Re::Ok(())}
pub fn csq_poc (path: &str) -> Re<()> {
let db = Connection::open_in_memory()?;
db.create_module ("csq", vtab::read_only_module::<CsqVTab>(), None)?;
let sql = fomat! ("CREATE VIRTUAL TABLE vtab USING csq (path=" (path) ")");
db.execute_batch (&sql)?;
let schema = db.query_row ("SELECT sql FROM sqlite_schema WHERE name = 'vtab'", [], |row| row.get::<_, String> (0))?;
log! ("schema: " (schema));
let mut columns = 0;
for row in db.prepare ("SELECT * FROM pragma_table_info ('vtab')")? .query_map ([], |row| {
let cid = row.get::<_, i32> (0)?;
let name = row.get::<_, Rc<str>> (1)?;
let ty = row.get::<_, Rc<str>> (2)?;
let notnull = row.get::<_, bool> (3)?;
log! ("column " (cid) ": " [=name] ' ' [=ty] ' ' [=notnull]);
columns += 1;
Ok(())})? {row?}
let rows = db.query_row ("SELECT COUNT(*) FROM vtab", [], |row| row.get::<_, i32> (0))?;
log! ([=rows]);
for row in db.prepare ("SELECT rowid, * FROM vtab")? .query_map ([], |row| {
let rowid = row.get::<_, u32> (0)?;
for col in 0..columns {
let val = row.get::<_, Value> (1 + col)?;
log! ((rowid) ' ' [=val])}
Ok(())})? {row?}
Re::Ok(())}}
#[cfg(all(test, feature = "nightly", feature = "sqlite"))] mod csq_test {
#[test] fn no_such_file() {
let db = rusqlite::Connection::open_in_memory().unwrap();
super::csq::csq_load (&db) .unwrap();
let rc = db.execute_batch ("CREATE VIRTUAL TABLE vt USING csq (path=/no/such/file)");
assert! (rc.is_err());
let err = format! ("{:?}", rc);
assert! (err.contains ("csq] lines:"));
assert! (err.contains ("(os error "))}}
#[cfg(all(test, feature = "nightly", feature = "sqlite"))] mod csq_bench {
extern crate test;
use std::io::Write;
use std::rc::Rc;
fn gen (name: &str, num: i32) {
let mut file = std::io::BufWriter::new (std::fs::File::create (name) .unwrap());
for i in 0..num {writeln! (&mut file, "foo,bar,{}\n", i) .unwrap()}}
#[bench] fn csq_open (bm: &mut test::Bencher) {
gen ("foobar1.csv", 12345);
bm.iter (|| {
let db = rusqlite::Connection::open_in_memory().unwrap();
super::csq::csq_load (&db) .unwrap();
db.execute_batch ("CREATE VIRTUAL TABLE vt USING csq (path=foobar1.csv)") .unwrap()});
std::fs::remove_file ("foobar1.csv") .unwrap()}
#[bench] fn csq_select_one (bm: &mut test::Bencher) {
let db = rusqlite::Connection::open_in_memory().unwrap();
super::csq::csq_load (&db) .unwrap();
gen ("foobar2.csv", 12345);
db.execute_batch ("CREATE VIRTUAL TABLE vt USING csq (path=foobar2.csv)") .unwrap();
let mut st = db.prepare ("SELECT * FROM vt LIMIT 1") .unwrap();
bm.iter (|| {
assert! (st.query_row ([], |row| Ok (row.get::<_, Rc<str>> (2) .unwrap().as_ref() == "1")) .unwrap())});
std::fs::remove_file ("foobar2.csv") .unwrap()}
#[bench] fn csq_next (bm: &mut test::Bencher) {
let db = rusqlite::Connection::open_in_memory().unwrap();
super::csq::csq_load (&db) .unwrap();
gen ("foobar3.csv", 12345);
db.execute_batch ("CREATE VIRTUAL TABLE vt USING csq (path=foobar3.csv)") .unwrap();
let st = Box::into_raw (Box::new (db.prepare ("SELECT * FROM vt") .unwrap()));
let mut rows = Box::into_raw (Box::new (unsafe {(*st).query ([]) .unwrap()}));
let mut i = 1;
bm.iter (|| {
if i == 0 {
unsafe {drop (Box::from_raw (rows))};
rows = Box::into_raw (Box::new (unsafe {(*st).query ([]) .unwrap()}));
i += 1
} else if i < 12345 - 1 {
let row = unsafe {(*rows).next().unwrap().unwrap()};
let ri = row.get::<_, Rc<str>> (2) .unwrap();
let ri: i32 = ri.parse().unwrap();
assert_eq! (ri, i);
i += 1
} else {
i = 0}});
unsafe {drop (Box::from_raw (st))};
std::fs::remove_file ("foobar3.csv") .unwrap()}}
pub fn crc16ccitt (mut crc: u16, ch: u8) -> u16 {
let mut v = 0x80u16;
for _ in 0u16..8 {
let xor_flag = (crc & 0x8000) != 0;
crc = crc << 1;
if (ch as u16 & v) != 0 {crc = crc + 1}
if xor_flag {crc = crc ^ 0x1021}
v = v >> 1}
crc}
pub fn crc16ccitt_aug (mut crc: u16) -> u16 {
for _ in 0u16..16 {
let xor_flag = (crc & 0x8000) != 0;
crc = crc << 1;
if xor_flag {crc = crc ^ 0x1021}}
crc}
#[cfg(all(test, feature = "nightly"))] mod crc_bench {
extern crate test;
use crate::lines::{crc16ccitt, crc16ccitt_aug};
use std::io::Write;
use std::rc::Rc;
use test::black_box;
#[bench] fn crc16mb (bm: &mut test::Bencher) {
let mut buf = [0u8; 1234];
let mut ch = 0; for ci in 0..buf.len() {buf[ci] = ch; ch = ch.wrapping_add (1)}
let (mut fr, mut bytes) = (0, 0);
bm.iter (|| {
let mut crc = 0xFFFF;
for &ch in &buf[fr..] {crc = crc16ccitt (crc, test::black_box (ch))}
bytes += buf.len() - fr;
fr += 1; if 321 < fr {fr = 0}});
bm.bytes = bytes as u64}
#[bench] fn crc16 (bm: &mut test::Bencher) {
bm.iter (|| {
assert_eq! (0x1D0F, crc16ccitt_aug (black_box (0xFFFF)))})}
#[bench] fn crc16_a (bm: &mut test::Bencher) {
assert_eq! (0xE1B1, crc16ccitt (0xFFFF, black_box (b'A')));
bm.iter (|| {
let crc = crc16ccitt (0xFFFF, black_box (b'A'));
assert_eq! (0x9479, crc16ccitt_aug (black_box (crc)))})}
#[bench] fn crc16_123456789 (bm: &mut test::Bencher) {
bm.iter (|| {
let mut crc = 0xFFFF;
for ch in b"123456789" {
crc = crc16ccitt (crc, black_box (*ch))}
assert_eq! (0xE5CC, crc16ccitt_aug (black_box (crc)))})}
#[bench] fn c8_123456789 (bm: &mut test::Bencher) {
bm.iter (|| {
let c8 = b"123456789".iter().fold (0u8, |a, &b| black_box (a.wrapping_add (b)));
assert_eq! (0xDD, c8)})}}
#[derive (Clone, Debug)]
pub struct Stat {
pub len: i64,
pub lmc: u64,
pub dir: bool}
#[cfg(unix)] pub fn fstat (fd: RawFd) -> Re<Stat> {
let mut buf: libc::stat = unsafe {MaybeUninit::zeroed().assume_init()};
let rc = unsafe {libc::fstat (fd, &mut buf)};
if rc == -1 {fail! ((io::Error::last_os_error()))}
let lmc = (buf.st_mtime as u64 * 100) + (buf.st_mtime_nsec / 10000000) as u64;
let dir = buf.st_mode & libc::S_IFMT == libc::S_IFDIR;
Re::Ok (Stat {len: buf.st_size as i64, lmc, dir})}
#[cfg(unix)] pub struct Dir {
pub fd: RawFd,
dir: RefCell<*mut libc::DIR>}
#[cfg(unix)] unsafe impl Send for Dir {}
#[cfg(unix)] impl Dir {
pub fn new (path: &dyn AsRef<Path>) -> Re<Dir> {
let path = ffi::CString::new (path.as_ref().to_str()?)?;
let flags = libc::O_RDONLY | libc::O_CLOEXEC | libc::O_DIRECTORY | libc::O_NOCTTY | libc::O_NOATIME;
let fd = unsafe {libc::open (path.as_ptr(), flags, 0)};
if fd == -1 {fail! ((io::Error::last_os_error()))}
Re::Ok (Dir {fd, dir: RefCell::new (null_mut())})}
pub fn list (&self, cb: &mut dyn FnMut (&[u8]) -> Re<bool>) -> Re<()> {
if self.dir.borrow().is_null() {
let dir = unsafe {libc::fdopendir (self.fd)};
if dir.is_null() {fail! ((io::Error::last_os_error()))}
self.dir.replace (dir);
} else {
unsafe {libc::rewinddir (*self.dir.borrow())}}
loop {
let dp = unsafe {libc::readdir (*self.dir.borrow())};
if dp.is_null() {break}
let name = unsafe {ffi::CStr::from_ptr ((*dp).d_name.as_ptr())};
let name = name.to_bytes();
if name == b"." || name == b".." {continue}
if !cb (name)? {break}}
Re::Ok(())}
pub fn file (&self, name: &[u8], creat: bool, append: bool) -> Re<fs::File> {
use std::os::fd::FromRawFd;
let mut flags = libc::O_RDWR | libc::O_CLOEXEC | libc::O_NOCTTY | libc::O_NOATIME;
if creat {flags |= libc::O_CREAT}
if append {flags |= libc::O_APPEND}
let cname = ffi::CString::new (name)?;
let fd = unsafe {libc::openat (self.fd, cname.as_ptr(), flags, 0o0600)};
if fd == -1 {fail! ((String::from_utf8_lossy (name)) "] " (io::Error::last_os_error()))}
Re::Ok (unsafe {fs::File::from_raw_fd (fd)})}
pub fn stat (&self, name: &[u8]) -> Re<Option<Stat>> {
let cname = ffi::CString::new (name)?;
let mut buf: libc::stat = unsafe {MaybeUninit::zeroed().assume_init()};
let rc = unsafe {libc::fstatat (self.fd, cname.as_ptr(), &mut buf, 0)};
if rc == -1 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::NotFound {return Re::Ok (None)}
fail! ((io::Error::last_os_error()))}
let lmc = (buf.st_mtime as u64 * 100) + (buf.st_mtime_nsec / 10000000) as u64;
let dir = buf.st_mode & libc::S_IFMT == libc::S_IFDIR;
Re::Ok (Some (Stat {len: buf.st_size as i64, lmc, dir}))}
pub fn unlink (&self, name: &[u8], dir: bool) -> Re<()> {
let cname = ffi::CString::new (name)?;
let flags = if dir {libc::AT_REMOVEDIR} else {0};
let rc = unsafe {libc::unlinkat (self.fd, cname.as_ptr(), flags)};
if rc == -1 {fail! ((io::Error::last_os_error()))}
Re::Ok(())}}
#[cfg(unix)] impl Drop for Dir {
fn drop (&mut self) {
let dir = self.dir.borrow_mut();
if dir.is_null() {
unsafe {libc::close (self.fd);}
} else {
unsafe {libc::closedir (*dir);}}}}
#[cfg(not(unix))] pub struct Dir {
_path: PathBuf}
#[cfg(not(unix))] impl Dir {
pub fn new (path: &dyn AsRef<Path>) -> Re<Dir> {
Re::Ok (Dir {_path: path.as_ref().to_path_buf()})}
pub fn list (&self, _cb: &mut dyn FnMut (&[u8]) -> Re<bool>) -> Re<()> {
fail! ("tbd")}
pub fn file (&self, _name: &[u8], _creat: bool, _append: bool) -> Re<fs::File> {
fail! ("tbd")}
pub fn stat (&self, _name: &[u8]) -> Re<Option<Stat>> {
fail! ("tbd")}
pub fn unlink (&self, _name: &[u8], _dir: bool) -> Re<()> {
fail! ("tbd")}}
#[cfg(all(feature = "serde", feature = "fast_rsync"))]
pub mod sync {
use ahash::RandomState;
use core::str::from_utf8;
use crate::{fail, slurp, now_ms};
use crate::lines::{lock, Dir, Lock};
use crate::re::Re;
use fast_rsync::{Signature, SignatureOptions};
use fomat_macros::fomat;
use indexmap::IndexMap as IndexMapB;
use indexmap::map::Entry;
use inlinable_string::{InlinableString, StringExt};
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::{self, Read, Write, Seek};
use std::path::Path;
use std::time::{Duration, UNIX_EPOCH};
type IndexMap<K, V> = IndexMapB<K, V, RandomState>;
#[derive (Clone, Debug)]
pub struct File {
pub hosts: InlinableString,
pub dir: InlinableString,
pub name: InlinableString}
pub fn parse_files (csv: &[u8]) -> Re<Vec<File>> {
let mut files = Vec::new();
for (line, lx) in csv.split (|ch| *ch == b'\n') .zip (0..) {
if lx == 0 {if line != b"hosts,dir,name" {fail! ("!head")} continue}
if line.is_empty() {continue}
let (mut hosts, mut dir, mut name) = (InlinableString::new(), InlinableString::new(), InlinableString::new());
for (col, cx) in line.split (|ch| *ch == b',') .zip (0..) {
if cx == 0 {hosts = InlinableString::from (from_utf8 (col.into())?)
} else if cx == 1 {dir = InlinableString::from (from_utf8 (col.into())?)
} else if cx == 2 {name = InlinableString::from (from_utf8 (col.into())?)
} else {fail! ([=cx])}}
files.push (File {hosts, dir, name})}
Re::Ok (files)}
#[derive (Default)]
pub struct State {
pub files: Vec<File>,
pub fds: IndexMap<u16, fs::File>,
pub locks: IndexMap<u16, (Lock, u32, Vec<u8>)>,
pub dirs: IndexMap<InlinableString, Dir>}
#[derive (Debug, Deserialize, Serialize)]
pub enum Req {
RsyncProto1 {
fx: u16,
flen: u32,
lm: u64,
ofs: u32,
sig: Vec<u8>}}
#[derive (Debug, Deserialize, Serialize)]
pub enum Rep {
RsyncProto1 {
fx: u16,
lm: u64,
ofs: u32,
delta: Vec<u8>,
dlen: u32,
bsum: u16}}
pub fn rsync_pull (reqs: &mut Vec<Req>, hostname: &str, state: &mut State, name: &str, tail: i32, block: u16, hash: u8) -> Re<()> {
let (file, fx) = {let mut it = state.files.iter().zip (0u16..); 'file: loop {
let Some ((file, fx)) = it.next() else {fail! ("!file")};
if file.name != name {continue}
for host in file.hosts.split ('&') {
if host == hostname {break 'file (file, fx)}}}};
let fd = match state.fds.entry (fx) {
Entry::Vacant (ve) => {
let path = Path::new (&file.dir[..]) .join (name);
ve.insert (fs::OpenOptions::new() .read (true) .write (true) .create (true) .open (&path)?)},
Entry::Occupied (oe) => oe.into_mut()};
let (_lock, ofs, bytes) = match state.locks.entry (fx) {
Entry::Vacant (ve) => {
let Ok (lock) = lock (fd, true) else {fail! ("!lock: " (name))};
ve.insert ((lock, 0, Vec::new()))},
Entry::Occupied (oe) => oe.into_mut()};
let meta = fd.metadata()?;
let flen = meta.len();
if (u32::MAX as u64) < flen {fail! ("!u32: " [=flen])}
let flen = flen as u32;
let lm = (meta.modified()?.duration_since (UNIX_EPOCH)? .as_millis() / 10) as u64;
*ofs = if tail < 0 {
0 .max (flen as i32 + tail) as u32
} else if 0 < tail {
if tail < flen as i32 {0} else {tail as u32}
} else {0};
let pos = fd.seek (io::SeekFrom::Start (*ofs as u64))?;
if pos != *ofs as u64 {fail! ("!seek " [=ofs] ' ' [=pos])}
bytes.clear();
bytes.reserve_exact ((flen as u64 - *ofs as u64) as usize);
fd.read_to_end (bytes)?;
let sig = Signature::calculate (&bytes, SignatureOptions {
block_size: block as u32,
crypto_hash_size: hash as u32});
reqs.push (Req::RsyncProto1 {fx, flen, lm, ofs: *ofs, sig: sig.into_serialized()});
Re::Ok(())}
pub fn handle (reps: &[Rep], hostname: &str, state: &mut State) -> Re<()> {
for rep in reps {match rep {
Rep::RsyncProto1 {fx, lm, ofs, delta, dlen, bsum:_} => {
let file = &state.files[*fx as usize];
if !file.hosts.split ('&') .any (|h| h == hostname) {fail! ("!hosts")}
let Some ((lock, ofs0, mut bytes)) = state.locks.swap_remove (fx) else {fail! ("!lock: " (file.name))};
let Some (fd) = state.fds.get_mut (fx) else {fail! ("!fd: " (file.name))};
if *ofs != ofs0 && ofs0 != 0 {bytes.clear()} let mut buf = Vec::with_capacity (*dlen as usize);
fast_rsync::apply_limited (&bytes, &delta, &mut buf, *dlen as usize)?;
let lm = UNIX_EPOCH + Duration::from_millis (*lm * 10);
if bytes == buf { fd.set_modified (lm)?;
} else {
let pos = fd.seek (io::SeekFrom::Start (*ofs as u64))?;
if pos != *ofs as u64 {fail! ("!seek " [=ofs] ' ' [=pos])}
fd.write_all (&buf)?;
fd.set_len (*ofs as u64 + buf.len() as u64)?;
fd.set_modified (lm)?;}
drop (lock);}}}
Re::Ok(())}}