#![ allow (unused_parens) ]
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::LinkedList;
use std::io::Cursor;
use std::io::Read;
use std::io::Write;
use std::ops::DerefMut;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use rust_crypto::digest::Digest;
use rust_crypto::sha1::Sha1;
use rust_crypto::sha2::Sha256;
use futures;
use futures::BoxFuture;
use futures::Future;
use futures_cpupool::CpuPool;
use num_cpus;
use output;
use output::Output;
use protobuf::stream::CodedInputStream;
use rustc_serialize::hex::ToHex;
use misc::*;
use zbackup::bundle_loader::*;
use zbackup::chunk_cache::*;
use zbackup::data::*;
use zbackup::disk_format::*;
use zbackup::index_cache::*;
use zbackup::randaccess::*;
use zbackup::repository_core::*;
#[ derive (Clone) ]
pub struct Repository {
data: Arc <RepositoryData>,
state: Arc <Mutex <RepositoryState>>,
cpu_pool: CpuPool,
bundle_loader: BundleLoader,
chunk_cache: ChunkCache <ChunkId>,
}
type ChunkMap = Arc <HashMap <ChunkId, ChunkData>>;
#[ derive (Clone) ]
pub struct RepositoryConfig {
pub max_uncompressed_memory_cache_entries: usize,
pub max_compressed_memory_cache_entries: usize,
pub max_compressed_filesystem_cache_entries: usize,
pub max_threads: usize,
pub filesystem_cache_path: String,
pub work_jobs_total: usize, pub work_jobs_batch: usize, }
struct RepositoryData {
config: RepositoryConfig,
core: Arc <RepositoryCore>,
}
pub struct RepositoryStatus {
pub bundle_loader: BundleLoaderStatus,
pub chunk_cache: ChunkCacheStatus,
}
type ChunkFuture =
BoxFuture <ChunkData, String>;
type ChunkDoubleFuture =
BoxFuture <ChunkFuture, String>;
struct RepositoryState {
index_cache: IndexCache,
bundles_needed: HashSet <BundleId>,
}
impl Repository {
pub fn default_config () -> RepositoryConfig {
RepositoryConfig {
max_uncompressed_memory_cache_entries:
MAX_UNCOMPRESSED_MEMORY_CACHE_ENTRIES,
max_compressed_memory_cache_entries:
MAX_COMPRESSED_MEMORY_CACHE_ENTRIES,
max_compressed_filesystem_cache_entries:
MAX_COMPRESSED_FILESYSTEM_CACHE_ENTRIES,
max_threads:
num_cpus::get () * 2,
filesystem_cache_path:
FILESYSTEM_CACHE_PATH.to_owned (),
work_jobs_total: 0, work_jobs_batch: 0,
}
}
#[ inline ]
pub fn open <
RepositoryPath: AsRef <Path>,
PasswordFilePath: AsRef <Path>,
> (
output: & Output,
repository_config: RepositoryConfig,
repository_path: RepositoryPath,
password_file_path: Option <PasswordFilePath>,
) -> Result <Repository, String> {
Self::open_impl (
output,
repository_config,
repository_path.as_ref (),
password_file_path.as_ref ().map (|path| path.as_ref ()),
)
}
fn open_impl (
output: & Output,
repository_config: RepositoryConfig,
repository_path: & Path,
password_file_path: Option <& Path>,
) -> Result <Repository, String> {
let repository_core =
Arc::new (
RepositoryCore::open (
output,
repository_path,
password_file_path,
) ?
);
let cpu_pool =
CpuPool::new (
repository_config.max_threads);
let bundle_loader =
BundleLoader::new (
repository_core.clone (),
repository_config.max_threads);
let chunk_cache =
ChunkCache::new (
repository_config.filesystem_cache_path.clone (),
repository_config.max_threads,
repository_config.max_uncompressed_memory_cache_entries,
repository_config.max_compressed_memory_cache_entries,
repository_config.max_compressed_filesystem_cache_entries * 7/8,
repository_config.max_compressed_filesystem_cache_entries * 1/8,
true,
) ?;
let repository_data =
Arc::new (RepositoryData {
config: repository_config,
core: repository_core.clone (),
});
let repository_state =
Arc::new (Mutex::new (RepositoryState {
index_cache: IndexCache::new (
repository_core.clone (),
),
bundles_needed: HashSet::new (),
}));
Ok (Repository {
data: repository_data,
state: repository_state,
cpu_pool: cpu_pool,
bundle_loader: bundle_loader,
chunk_cache: chunk_cache,
})
}
pub fn load_indexes (
& self,
output: & Output,
) -> Result <(), String> {
let mut self_state =
self.state.lock ().unwrap ();
self_state.index_cache.load_if_not_loaded (
output,
)
}
pub fn reload_indexes (
& self,
output: & Output,
) -> Result <(), String> {
let mut self_state =
self.state.lock ().unwrap ();
self_state.index_cache.reload (
output,
)
}
pub fn read_and_expand_backup (
& self,
output: & Output,
backup_name: & str,
) -> Result <(Vec <u8>, [u8; 32]), String> {
self.load_indexes (
output,
) ?;
let output_job =
output_job_start! (
output,
"Loading backup {}",
backup_name);
let backup_info =
backup_read_path (
self.data.core.backup_path (
backup_name),
self.data.core.encryption_key (),
) ?;
let mut input =
Cursor::new (
backup_info.backup_data ().to_owned ());
for _iteration in 0 .. backup_info.iterations () {
let mut temp_output: Cursor <Vec <u8>> =
Cursor::new (
Vec::new ());
let mut sha1_digest =
Sha1::new ();
self.follow_instructions (
& mut input,
& mut temp_output,
& mut sha1_digest,
& |count| {
if count & 0xf == 0xf {
output_job.tick ();
}
},
) ?;
input =
Cursor::new (
temp_output.into_inner ());
}
output_job.complete ();
Ok (
(
input.into_inner (),
backup_info.sha256 (),
)
)
}
pub fn restore (
& self,
output: & Output,
backup_name: & str,
target: & mut Write,
) -> Result <(), String> {
if backup_name.is_empty () {
return Err (
"Backup name must not be empty".to_string ());
}
if backup_name.chars ().next ().unwrap () != '/' {
return Err (
"Backup name must begin with '/'".to_string ());
}
let (input_bytes, checksum) =
self.read_and_expand_backup (
output,
backup_name,
) ?;
let mut input =
Cursor::new (
input_bytes);
let output_job =
output_job_start! (
output,
"Restoring {}",
backup_name);
let mut sha256_sum =
Sha256::new ();
self.follow_instructions (
& mut input,
target,
& mut sha256_sum,
& |count| {
if count & 0x7f == 0x00 {
output_job.tick ();
}
},
) ?;
let mut sha256_sum_bytes: [u8; 32] =
[0u8; 32];
sha256_sum.result (
& mut sha256_sum_bytes);
if checksum != sha256_sum_bytes {
return Err (
format! (
"Expected sha256 checksum {} but calculated {}",
checksum.to_hex (),
sha256_sum_bytes.to_hex ()));
}
output_job.complete ();
Ok (())
}
#[ doc (hidden) ]
pub fn restore_test (
& self,
output: & Output,
backup_name: & str,
target: & mut Write,
) -> Result <(), String> {
let output_job =
output_job_start! (
output,
"Restoring {}",
backup_name);
let mut input =
RandomAccess::new (
output,
self,
backup_name,
) ?;
let mut buffer: Vec <u8> =
vec! [0u8; BUFFER_SIZE];
loop {
let bytes_read =
io_result (
input.read (
& mut buffer),
) ?;
if bytes_read == 0 {
break;
}
io_result (
target.write (
& buffer [
0 .. bytes_read ]),
) ?;
}
output_job.complete ();
Ok (())
}
fn follow_instruction_async_async (
& self,
debug: & Output,
backup_instruction: & DiskBackupInstruction,
) -> BoxFuture <BoxFuture <ChunkData, String>, String> {
if backup_instruction.has_chunk_to_emit ()
&& backup_instruction.has_bytes_to_emit () {
let chunk_id =
backup_instruction.chunk_to_emit ();
let backup_instruction_bytes_to_emit =
backup_instruction.bytes_to_emit ().to_vec ();
self.get_chunk_async_async_debug (
debug,
chunk_id,
).map (
move |chunk_data_future|
chunk_data_future.map (
move |chunk_data|
Arc::new (
chunk_data.iter ().map (
move |& value| value
).chain (
backup_instruction_bytes_to_emit.into_iter ()
).collect ())
).boxed ()
).boxed ()
} else if backup_instruction.has_chunk_to_emit () {
let chunk_id =
backup_instruction.chunk_to_emit ();
self.get_chunk_async_async_debug (
debug,
chunk_id,
)
} else if backup_instruction.has_bytes_to_emit () {
futures::done (Ok (
futures::done (Ok (
Arc::new (
backup_instruction.bytes_to_emit ().to_vec ())
)).boxed ()
)).boxed ()
} else {
futures::failed::<BoxFuture <ChunkData, String>, String> (
"Instruction with neither chunk or bytes".to_string ()
).boxed ()
}
}
#[ doc (hidden) ]
pub fn follow_instructions (
& self,
input: & mut Read,
target: & mut Write,
digest: & mut Digest,
progress: & Fn (u64),
) -> Result <(), String> {
self.follow_instructions_debug (
& output::null (),
input,
target,
digest,
progress,
)
}
#[ doc (hidden) ]
pub fn follow_instructions_debug (
& self,
debug: & Output,
input: & mut Read,
target: & mut Write,
digest: & mut Digest,
progress: & Fn (u64),
) -> Result <(), String> {
let mut coded_input_stream =
CodedInputStream::new (
input);
let mut count: u64 = 0;
enum JobTarget {
Chunk (ChunkData),
FutureChunk (BoxFuture <ChunkData, String>),
}
type Job = BoxFuture <JobTarget, String>;
let mut current_chunk_job: Option <Job> =
None;
let mut next_chunk_jobs: LinkedList <Job> =
LinkedList::new ();
let mut future_chunk_job: Option <Job> =
None;
let mut eof = false;
loop {
if future_chunk_job.is_none () && ! eof {
if (
protobuf_result (
coded_input_stream.eof (),
) ?
) {
eof = true;
} else {
let backup_instruction =
DiskBackupInstruction::read (
& mut coded_input_stream,
) ?;
future_chunk_job = Some (
self.follow_instruction_async_async (
debug,
& backup_instruction,
).map (
|future_chunk_data|
JobTarget::FutureChunk (
future_chunk_data)
).boxed ()
);
}
}
if current_chunk_job.is_none () {
current_chunk_job =
next_chunk_jobs.pop_front ();
}
let have_current_chunk_job =
current_chunk_job.is_some ();
let have_future_chunk_job =
future_chunk_job.is_some ();
if (
! have_current_chunk_job
&& ! have_future_chunk_job
) {
break;
}
let completed_job_target =
match futures::select_all (vec! [
current_chunk_job.unwrap_or_else (
|| futures::empty ().boxed ()),
future_chunk_job.unwrap_or_else (
|| futures::empty ().boxed ()),
]).wait () {
Ok ((value, 0, remaining_future)) => {
future_chunk_job =
if have_future_chunk_job {
Some (
remaining_future.into_iter ()
.next ()
.unwrap ()
.boxed ()
)
} else { None };
current_chunk_job = None;
value
},
Ok ((value, 1, remaining_future)) => {
current_chunk_job =
if have_current_chunk_job {
Some (
remaining_future.into_iter ()
.next ()
.unwrap ()
.boxed ()
)
} else { None };
future_chunk_job = None;
value
},
Ok ((_, _, _)) =>
panic! ("Not possible"),
Err ((error, _, _)) =>
return Err (error),
};
match completed_job_target {
JobTarget::Chunk (chunk_data) => {
digest.input (
& chunk_data);
io_result (
target.write (
& chunk_data)
) ?;
progress (
count);
count += 1;
},
JobTarget::FutureChunk (future_chunk) => {
next_chunk_jobs.push_back (
future_chunk.map (
|chunk_data|
JobTarget::Chunk (
chunk_data)
).boxed ()
);
},
};
}
Ok (())
}
pub fn get_chunk (
& self,
chunk_id: ChunkId,
) -> Result <ChunkData, String> {
self.get_chunk_debug (
& output::null (),
chunk_id,
)
}
#[ doc (hidden) ]
pub fn get_chunk_debug (
& self,
debug: & Output,
chunk_id: ChunkId,
) -> Result <ChunkData, String> {
self.get_chunk_async_debug (
debug,
chunk_id,
).wait ()
}
pub fn get_chunk_async (
& self,
chunk_id: ChunkId,
) -> BoxFuture <ChunkData, String> {
self.get_chunk_async_debug (
& output::null (),
chunk_id,
)
}
#[ doc (hidden) ]
pub fn get_chunk_async_debug (
& self,
debug: & Output,
chunk_id: ChunkId,
) -> BoxFuture <ChunkData, String> {
self.get_chunk_async_async_debug (
debug,
chunk_id,
).and_then (
|future|
future.wait ()
).boxed ()
}
pub fn get_chunk_async_async (
& self,
chunk_id: ChunkId,
) -> BoxFuture <BoxFuture <ChunkData, String>, String> {
self.get_chunk_async_async_debug (
& output::null (),
chunk_id,
)
}
#[ doc (hidden) ]
pub fn get_chunk_async_async_debug (
& self,
debug: & Output,
chunk_id: ChunkId,
) -> BoxFuture <BoxFuture <ChunkData, String>, String> {
let mut self_state =
self.state.lock ().unwrap ();
if ! self_state.index_cache.loaded () {
panic! (
"Must load indexes before getting chunks");
}
let debug_clone =
debug.clone ();
if let Some (chunk_data_future) =
self.chunk_cache.get (
debug,
& chunk_id,
) {
let self_clone =
self.clone ();
return futures::done (
Ok (chunk_data_future),
).or_else (
move |_error: String| {
let mut self_state =
self_clone.state.lock ().unwrap ();
self_clone.load_chunk_async_async (
& debug_clone,
self_state.deref_mut (),
chunk_id)
}).boxed ();
}
self.load_chunk_async_async (
debug,
self_state.deref_mut (),
chunk_id)
}
fn load_chunk_async_async (
& self,
debug: & Output,
self_state: & mut RepositoryState,
chunk_id: ChunkId,
) -> BoxFuture <BoxFuture <ChunkData, String>, String> {
match self_state.index_cache.get (
& chunk_id,
) {
Some (index_entry) =>
self.load_chunk_async_async_impl (
debug,
self_state,
chunk_id,
index_entry.bundle_id (),
),
None =>
futures::failed (
format! (
"Missing chunk: {}",
chunk_id),
).boxed (),
}
}
fn load_chunk_async_async_impl (
& self,
_debug: & Output,
self_state: & mut RepositoryState,
chunk_id: ChunkId,
bundle_id: BundleId,
) -> ChunkDoubleFuture {
self_state.bundles_needed.insert (
bundle_id);
let self_clone =
self.clone ();
self.bundle_loader.load_bundle_async_async (
& output::null (),
bundle_id,
).map (
move |chunk_map_future: BoxFuture <ChunkMap, String>|
chunk_map_future.then (
move |chunk_map_result: Result <ChunkMap, String>| {
chunk_map_result.map (
move |chunk_map| {
let mut self_state =
self_clone.state.lock ().unwrap ();
if self_state.bundles_needed.remove (
& bundle_id) {
for (chunk_id, chunk_data)
in chunk_map.iter () {
self_clone.chunk_cache.insert (
* chunk_id,
chunk_data.clone (),
false,
) ?;
}
}
if let Some (chunk_data) =
chunk_map.get (& chunk_id) {
self_clone.chunk_cache.insert (
chunk_id,
chunk_data.clone (),
true,
) ?;
Ok (chunk_data.clone ())
} else {
Err (
format! (
"Chunk not found: {}",
chunk_id)
)
}
}).and_then (
|result| result
)
}).boxed ()
).boxed ()
}
pub fn get_index_entry (
& self,
chunk_id: ChunkId,
) -> Result <IndexEntry, String> {
let self_state =
self.state.lock ().unwrap ();
if ! self_state.index_cache.loaded () {
panic! (
"Must load indexes before getting index entries");
}
match self_state.index_cache.get (
& chunk_id,
) {
Some (value) =>
Ok (value.clone ()),
None =>
Err (
format! (
"Missing chunk: {}",
chunk_id)
),
}
}
pub fn has_chunk (
& self,
chunk_id: & ChunkId,
) -> bool {
let self_state =
self.state.lock ().unwrap ();
if ! self_state.index_cache.loaded () {
panic! (
"Must load indexes before getting index entries");
}
self_state.index_cache.get (
chunk_id,
).is_some ()
}
pub fn open_backup (
& self,
output: & Output,
backup_name: & str,
) -> Result <RandomAccess, String> {
RandomAccess::new (
output,
self,
backup_name)
}
pub fn close (
self,
output: & Output,
) {
let output_job =
output_job_start! (
output,
"Closing repository");
drop (self);
output_job.complete ();
}
#[ inline ]
pub fn config (
& self,
) -> & RepositoryConfig {
& self.data.config
}
#[ inline ]
pub fn core (& self) -> & RepositoryCore {
& self.data.core
}
#[ inline ]
pub fn path (& self) -> & Path {
& self.data.core.path ()
}
#[ inline ]
pub fn storage_info (& self) -> & DiskStorageInfo {
& self.data.core.storage_info ()
}
#[ inline ]
pub fn encryption_key (& self) -> Option <EncryptionKey> {
self.data.core.encryption_key ()
}
#[ inline ]
pub fn index_path (
& self,
index_id: IndexId,
) -> PathBuf {
self.data.core.index_path (
index_id,
)
}
#[ inline ]
pub fn bundle_path (
& self,
bundle_id: BundleId,
) -> PathBuf {
self.data.core.bundle_path (
bundle_id,
)
}
pub fn status (
& self,
) -> RepositoryStatus {
RepositoryStatus {
bundle_loader:
self.bundle_loader.status (),
chunk_cache:
self.chunk_cache.status (),
}
}
}