use crate::manifest::StoredArchive;
use crate::repository::backend::{
backend_to_object, Backend, BackendObject, Index, Manifest, Result, SegmentDescriptor,
};
use crate::repository::{Chunk, ChunkID, ChunkSettings, EncryptedKey};
use async_trait::async_trait;
use chrono::prelude::*;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::collections::HashSet;
use tokio::task;
pub trait SyncManifest: Send + std::fmt::Debug {
type Iterator: Iterator<Item = StoredArchive> + std::fmt::Debug + Send + 'static;
fn last_modification(&mut self) -> Result<DateTime<FixedOffset>>;
fn chunk_settings(&mut self) -> ChunkSettings;
fn archive_iterator(&mut self) -> Self::Iterator;
fn write_chunk_settings(&mut self, settings: ChunkSettings) -> Result<()>;
fn write_archive(&mut self, archive: StoredArchive) -> Result<()>;
fn touch(&mut self) -> Result<()>;
}
pub trait SyncIndex: Send + std::fmt::Debug {
fn lookup_chunk(&mut self, id: ChunkID) -> Option<SegmentDescriptor>;
fn set_chunk(&mut self, id: ChunkID, location: SegmentDescriptor) -> Result<()>;
fn known_chunks(&mut self) -> HashSet<ChunkID>;
fn commit_index(&mut self) -> Result<()>;
fn chunk_count(&mut self) -> usize;
}
pub trait SyncBackend: 'static + Send + std::fmt::Debug {
type SyncManifest: SyncManifest + 'static;
type SyncIndex: SyncIndex + 'static;
fn get_index(&mut self) -> &mut Self::SyncIndex;
fn get_manifest(&mut self) -> &mut Self::SyncManifest;
fn write_key(&mut self, key: EncryptedKey) -> Result<()>;
fn read_key(&mut self) -> Result<EncryptedKey>;
fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Chunk>;
fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor>;
}
enum SyncIndexCommand {
Lookup(ChunkID, oneshot::Sender<Option<SegmentDescriptor>>),
Set(ChunkID, SegmentDescriptor, oneshot::Sender<Result<()>>),
KnownChunks(oneshot::Sender<HashSet<ChunkID>>),
Commit(oneshot::Sender<Result<()>>),
Count(oneshot::Sender<usize>),
}
enum SyncManifestCommand<I> {
LastMod(oneshot::Sender<Result<DateTime<FixedOffset>>>),
ChunkSettings(oneshot::Sender<ChunkSettings>),
ArchiveIterator(oneshot::Sender<I>),
WriteChunkSettings(ChunkSettings, oneshot::Sender<Result<()>>),
WriteArchive(StoredArchive, oneshot::Sender<Result<()>>),
Touch(oneshot::Sender<Result<()>>),
}
enum SyncBackendCommand {
ReadChunk(SegmentDescriptor, oneshot::Sender<Result<Chunk>>),
WriteChunk(Chunk, oneshot::Sender<Result<SegmentDescriptor>>),
ReadKey(oneshot::Sender<Result<EncryptedKey>>),
WriteKey(EncryptedKey, oneshot::Sender<Result<()>>),
Close(oneshot::Sender<()>),
}
enum SyncCommand<I> {
Index(SyncIndexCommand),
Manifest(SyncManifestCommand<I>),
Backend(SyncBackendCommand),
}
pub struct BackendHandle<B: SyncBackend> {
channel:
mpsc::Sender<SyncCommand<<<B as SyncBackend>::SyncManifest as SyncManifest>::Iterator>>,
}
impl<B> BackendHandle<B>
where
B: SyncBackend + Send + 'static,
{
pub fn new(mut backend: B) -> Self {
let (input, mut output) = mpsc::channel(100);
task::spawn(async move {
let mut final_ret: Option<oneshot::Sender<()>> = None;
while let Some(command) = output.next().await {
task::block_in_place(|| match command {
SyncCommand::Index(index_command) => {
let index = backend.get_index();
match index_command {
SyncIndexCommand::Lookup(id, ret) => {
ret.send(index.lookup_chunk(id)).unwrap();
}
SyncIndexCommand::Set(id, location, ret) => {
ret.send(index.set_chunk(id, location)).unwrap();
}
SyncIndexCommand::KnownChunks(ret) => {
ret.send(index.known_chunks()).unwrap();
}
SyncIndexCommand::Commit(ret) => {
ret.send(index.commit_index()).unwrap();
}
SyncIndexCommand::Count(ret) => {
ret.send(index.chunk_count()).unwrap();
}
};
}
SyncCommand::Manifest(manifest_command) => {
let manifest = backend.get_manifest();
match manifest_command {
SyncManifestCommand::LastMod(ret) => {
ret.send(manifest.last_modification()).unwrap();
}
SyncManifestCommand::ChunkSettings(ret) => {
ret.send(manifest.chunk_settings()).unwrap();
}
SyncManifestCommand::ArchiveIterator(ret) => {
ret.send(manifest.archive_iterator()).unwrap();
}
SyncManifestCommand::WriteChunkSettings(settings, ret) => {
ret.send(manifest.write_chunk_settings(settings)).unwrap();
}
SyncManifestCommand::WriteArchive(archive, ret) => {
ret.send(manifest.write_archive(archive)).unwrap();
}
SyncManifestCommand::Touch(ret) => {
ret.send(manifest.touch()).unwrap();
}
}
}
SyncCommand::Backend(backend_command) => match backend_command {
SyncBackendCommand::ReadChunk(location, ret) => {
ret.send(backend.read_chunk(location)).unwrap();
}
SyncBackendCommand::WriteChunk(chunk, ret) => {
ret.send(backend.write_chunk(chunk)).unwrap();
}
SyncBackendCommand::WriteKey(key, ret) => {
ret.send(backend.write_key(key)).unwrap();
}
SyncBackendCommand::ReadKey(ret) => {
ret.send(backend.read_key()).unwrap();
}
SyncBackendCommand::Close(ret) => {
final_ret = Some(ret);
}
},
});
if final_ret.is_some() {
break;
}
}
std::mem::drop(backend);
std::mem::drop(output);
if let Some(ret) = final_ret {
ret.send(()).unwrap();
}
});
BackendHandle { channel: input }
}
}
impl<B: SyncBackend> std::fmt::Debug for BackendHandle<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Opaque Backend Handle")
}
}
impl<B: SyncBackend> Clone for BackendHandle<B> {
fn clone(&self) -> Self {
BackendHandle {
channel: self.channel.clone(),
}
}
}
#[async_trait]
impl<B: SyncBackend> Manifest for BackendHandle<B> {
type Iterator = <<B as SyncBackend>::SyncManifest as SyncManifest>::Iterator;
async fn last_modification(&mut self) -> Result<DateTime<FixedOffset>> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Manifest(SyncManifestCommand::LastMod(i)))
.await
.unwrap();
o.await?
}
async fn chunk_settings(&mut self) -> ChunkSettings {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Manifest(SyncManifestCommand::ChunkSettings(i)))
.await
.unwrap();
o.await.unwrap()
}
async fn archive_iterator(&mut self) -> Self::Iterator {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Manifest(SyncManifestCommand::ArchiveIterator(
i,
)))
.await
.unwrap();
o.await.unwrap()
}
async fn write_chunk_settings(&mut self, settings: ChunkSettings) -> Result<()> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Manifest(
SyncManifestCommand::WriteChunkSettings(settings, i),
))
.await
.unwrap();
o.await?
}
async fn write_archive(&mut self, archive: StoredArchive) -> Result<()> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Manifest(SyncManifestCommand::WriteArchive(
archive, i,
)))
.await
.unwrap();
o.await?
}
async fn touch(&mut self) -> Result<()> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Manifest(SyncManifestCommand::Touch(i)))
.await
.unwrap();
o.await?
}
}
#[async_trait]
impl<B: SyncBackend> Index for BackendHandle<B> {
async fn lookup_chunk(&mut self, id: ChunkID) -> Option<SegmentDescriptor> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Index(SyncIndexCommand::Lookup(id, i)))
.await
.unwrap();
o.await.unwrap()
}
async fn set_chunk(&mut self, id: ChunkID, location: SegmentDescriptor) -> Result<()> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Index(SyncIndexCommand::Set(id, location, i)))
.await
.unwrap();
o.await?
}
async fn known_chunks(&mut self) -> HashSet<ChunkID> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Index(SyncIndexCommand::KnownChunks(i)))
.await
.unwrap();
o.await.unwrap()
}
async fn commit_index(&mut self) -> Result<()> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Index(SyncIndexCommand::Commit(i)))
.await
.unwrap();
o.await?
}
async fn count_chunk(&mut self) -> usize {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Index(SyncIndexCommand::Count(i)))
.await
.unwrap();
o.await.unwrap()
}
}
#[async_trait]
impl<B: SyncBackend> Backend for BackendHandle<B> {
type Manifest = Self;
type Index = Self;
fn get_index(&self) -> Self::Index {
self.clone()
}
async fn write_key(&self, key: &EncryptedKey) -> Result<()> {
let mut new_self = self.clone();
let (i, o) = oneshot::channel();
new_self
.channel
.send(SyncCommand::Backend(SyncBackendCommand::WriteKey(
key.clone(),
i,
)))
.await
.unwrap();
o.await.unwrap()
}
async fn read_key(&self) -> Result<EncryptedKey> {
let mut new_self = self.clone();
let (i, o) = oneshot::channel();
new_self
.channel
.send(SyncCommand::Backend(SyncBackendCommand::ReadKey(i)))
.await
.unwrap();
o.await?
}
fn get_manifest(&self) -> Self::Manifest {
self.clone()
}
async fn read_chunk(&mut self, location: SegmentDescriptor) -> Result<Chunk> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Backend(SyncBackendCommand::ReadChunk(
location, i,
)))
.await
.unwrap();
o.await?
}
async fn write_chunk(&mut self, chunk: Chunk) -> Result<SegmentDescriptor> {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Backend(SyncBackendCommand::WriteChunk(
chunk, i,
)))
.await
.unwrap();
o.await?
}
async fn close(&mut self) {
let (i, o) = oneshot::channel();
self.channel
.send(SyncCommand::Backend(SyncBackendCommand::Close(i)))
.await
.unwrap();
o.await.unwrap()
}
fn get_object_handle(&self) -> BackendObject {
backend_to_object(self.clone())
}
}