use super::util::LockedFile;
use super::SFTPConnection;
use crate::repository::backend::common::sync_backend::SyncIndex;
use crate::repository::backend::common::IndexTransaction;
use crate::repository::backend::{BackendError, Result, SegmentDescriptor};
use crate::repository::ChunkID;
use serde_cbor as cbor;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::{BufWriter, Seek, SeekFrom};
use std::path::PathBuf;
use std::rc::Rc;
#[derive(Debug)]
pub struct SFTPIndex {
state: HashMap<ChunkID, SegmentDescriptor>,
file: LockedFile,
changes: Vec<IndexTransaction>,
}
impl SFTPIndex {
#[allow(clippy::filter_map)]
pub fn connect(settings: impl Into<SFTPConnection>) -> Result<Self> {
let connection = settings.into().with_connection()?;
let sftp = connection
.sftp()
.expect("Connection succeed but no sftp session?");
let repository_path = PathBuf::from(&connection.settings().path);
if sftp.stat(&repository_path).is_err() {
sftp.mkdir(&repository_path, 0o775)?;
}
let index_path = repository_path.join("index");
if let Ok(file_stat) = sftp.stat(&index_path) {
if file_stat.file_type().is_file() {
return Err(BackendError::IndexError(format!(
"Failed to load index, {:?} is a file, not a directory",
index_path
)));
}
} else {
sftp.mkdir(&index_path, 0o775)?;
}
let mut state: HashMap<ChunkID, SegmentDescriptor> = HashMap::new();
let mut items = sftp
.readdir(&index_path)?
.into_iter()
.filter(|(_path, file_stat)| file_stat.file_type().is_file())
.map(|(path, _file_stat)| path)
.filter_map(|path| {
path.file_name()
.and_then(|x| x.to_string_lossy().parse::<u64>().ok())
.map(|x| (x, path))
})
.collect::<Vec<_>>();
items.sort_by(|a, b| a.0.cmp(&b.0));
for (_, path) in &items {
let mut file = sftp.open(path)?;
let de = cbor::Deserializer::from_reader(&mut file);
let mut de = de.into_iter::<IndexTransaction>();
while let Some(tx) = de.next().and_then(std::result::Result::ok) {
state.insert(tx.chunk_id, tx.descriptor);
}
}
for (_, path) in &items {
let locked_file = LockedFile::open_read_write(path, Rc::clone(&sftp))?;
if let Some(file) = locked_file {
return Ok(SFTPIndex {
state,
file,
changes: Vec::new(),
});
}
}
let id = if items.is_empty() {
0
} else {
items[items.len() - 1].0 + 1
};
let path = index_path.join(id.to_string());
let file = LockedFile::open_read_write(path, Rc::clone(&sftp))?
.expect("Somehow, we aren't able to lock our newly created index file");
Ok(SFTPIndex {
state,
file,
changes: Vec::new(),
})
}
}
impl SyncIndex for SFTPIndex {
fn lookup_chunk(&mut self, id: ChunkID) -> Option<SegmentDescriptor> {
self.state.get(&id).copied()
}
#[allow(clippy::map_entry)]
fn set_chunk(&mut self, id: ChunkID, location: SegmentDescriptor) -> Result<()> {
if !self.state.contains_key(&id) {
self.state.insert(id, location);
let transaction = IndexTransaction {
chunk_id: id,
descriptor: location,
};
self.changes.push(transaction);
}
Ok(())
}
fn known_chunks(&mut self) -> HashSet<ChunkID> {
self.state
.keys()
.copied()
.chain(self.changes.iter().map(|x| x.chunk_id))
.collect()
}
fn commit_index(&mut self) -> Result<()> {
let mut file = BufWriter::new(&mut self.file);
file.seek(SeekFrom::End(0))?;
for tx in self.changes.drain(0..self.changes.len()) {
cbor::ser::to_writer(&mut file, &tx)?;
}
drop(file);
self.file.fsync()?;
Ok(())
}
fn chunk_count(&mut self) -> usize {
self.state.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::sftp::SFTPSettings;
use std::env;
fn get_settings(path: String) -> SFTPSettings {
let hostname = env::var_os("ASURAN_SFTP_HOSTNAME")
.map(|x| x.into_string().unwrap())
.expect("Server must be set");
let username = env::var_os("ASURAN_SFTP_USER")
.map(|x| x.into_string().unwrap())
.unwrap_or("asuran".to_string());
let password = env::var_os("ASURAN_SFTP_PASS")
.map(|x| x.into_string().unwrap())
.unwrap_or("asuran".to_string());
let port = env::var_os("ASURAN_SFTP_PORT")
.map(|x| x.into_string().unwrap())
.unwrap_or("22".to_string())
.parse::<u16>()
.expect("Unable to parse port");
SFTPSettings {
hostname,
username,
port: Some(port),
password: Some(password),
path,
}
}
fn get_index(path: impl AsRef<str>) -> SFTPIndex {
let path = path.as_ref().to_string();
SFTPIndex::connect(get_settings(path)).expect("Unable to connect to index")
}
#[test]
fn connect() {
let _index = get_index("asuran/index_connect");
}
#[test]
fn set_lookup_chunk() {
let mut index = get_index("asuran/index_set_lookup_chunk");
let chunk_id = ChunkID::random_id();
let descriptor = SegmentDescriptor {
segment_id: 42,
start: 43,
};
index
.set_chunk(chunk_id, descriptor)
.expect("Unable to write chunk location");
index.commit_index().expect("Unable to commit index");
drop(index);
let mut index = get_index("asuran/index_set_lookup_chunk");
let result = index
.lookup_chunk(chunk_id)
.expect("Unable to read chunk location");
assert!(descriptor == result);
}
#[test]
fn chunk_count() {
let mut index = get_index("asuran/index_chunk_count");
let descriptor = SegmentDescriptor {
segment_id: 42,
start: 43,
};
for _ in 0..10 {
index
.set_chunk(ChunkID::random_id(), descriptor)
.expect("Unable to set chunk");
}
println!("Index changes length: {}", index.changes.len());
println!("Index state length: {}", index.state.len());
index.commit_index().expect("Unable to commit index");
drop(index);
let mut index = get_index("asuran/index_chunk_count");
assert_eq!(index.chunk_count(), 10);
}
#[test]
fn known_chunks() {
let mut index = get_index("asuran/index_known_chunks");
let descriptor = SegmentDescriptor {
segment_id: 42,
start: 43,
};
let chunks: HashSet<ChunkID> = (0..10).map(|_| ChunkID::random_id()).collect();
for chunk in &chunks {
index
.set_chunk(*chunk, descriptor)
.expect("Unable to set chunk");
}
index.commit_index().expect("unable to commit index");
drop(index);
let mut index = get_index("asuran/index_known_chunks");
let new_chunks = index.known_chunks();
assert!(new_chunks == chunks);
}
}