use std::collections::HashMap;
use anybytes::Bytes;
use ed25519_dalek::SigningKey;
use iroh_base::EndpointId;
use triblespace_core::blob::{BlobEncoding, IntoBlob};
use triblespace_core::blob::encodings::UnknownBlob;
use triblespace_core::blob::encodings::simplearchive::SimpleArchive;
use triblespace_core::id::Id;
use triblespace_core::repo::{
BlobStore, BlobStoreList, BlobStorePut, BranchStore, PushResult,
};
use triblespace_core::inline::Inline;
use triblespace_core::inline::InlineEncoding;
use triblespace_core::inline::encodings::hash::Handle;
use crate::channel::NetEvent;
use crate::host::{self, NetReceiver, NetSender, StoreSnapshot};
use crate::protocol::RawHash;
pub use crate::host::{PeerConfig, SyncDirection};
pub struct Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
store: S,
sender: NetSender,
receiver: NetReceiver,
last_blob_reader: Option<S::Reader>,
last_branches: HashMap<Id, RawHash>,
direction: SyncDirection,
last_event_at: std::time::Instant,
}
impl<S> Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
pub fn new(mut store: S, key: SigningKey, config: PeerConfig) -> Self {
let direction = config.direction;
let (sender, receiver) = host::spawn(key, config);
if let Some(snap) = StoreSnapshot::from_store(&mut store) {
sender.update_snapshot(snap);
}
if direction != SyncDirection::ReadOnly {
if let Ok(reader) = store.reader() {
use triblespace_core::repo::BlobStoreList;
for handle in reader.blobs().filter_map(Result::ok) {
sender.announce(handle.raw);
}
}
}
let last_blob_reader = store.reader().ok();
Peer {
store,
sender,
receiver,
last_blob_reader,
last_branches: HashMap::new(),
direction,
last_event_at: std::time::Instant::now(),
}
}
pub fn last_event_at(&self) -> std::time::Instant {
self.last_event_at
}
pub fn direction(&self) -> SyncDirection {
self.direction
}
pub fn id(&self) -> EndpointId {
self.sender.id()
}
pub fn refresh(&mut self) {
while let Some(event) = self.receiver.try_recv() {
self.last_event_at = std::time::Instant::now();
if self.direction == SyncDirection::WriteOnly {
continue;
}
match event {
NetEvent::Blob(data) => {
let bytes: Bytes = data.into();
let _ = self.store.put::<UnknownBlob, Bytes>(bytes);
}
NetEvent::Head { branch, head, publisher } => {
if let Some(remote_id) = Id::new(branch) {
if let Some(name) = read_remote_name(&mut self.store, &head) {
crate::tracking::ensure_tracking_branch(
&mut self.store,
remote_id,
&head,
&name,
&publisher,
);
}
}
}
}
}
if let Ok(current) = self.store.reader() {
if let Some(baseline) = self.last_blob_reader.as_ref() {
if self.direction != SyncDirection::ReadOnly {
for handle in current.blobs_diff(baseline).flatten() {
self.sender.announce(handle.raw);
}
}
}
self.last_blob_reader = Some(current);
}
if self.direction != SyncDirection::ReadOnly {
let bids: Vec<Id> = match self.store.branches() {
Ok(it) => it.filter_map(|r| r.ok()).collect(),
Err(_) => return,
};
for bid in bids {
if crate::tracking::is_tracking_branch(&mut self.store, bid) {
continue;
}
let head = match self.store.head(bid) {
Ok(Some(h)) => h,
_ => continue,
};
if self.last_branches.get(&bid) != Some(&head.raw) {
let bid_bytes: [u8; 16] = bid.into();
self.sender.gossip(bid_bytes, head.raw);
self.last_branches.insert(bid, head.raw);
}
}
}
if let Some(snap) = StoreSnapshot::from_store(&mut self.store) {
self.sender.update_snapshot(snap);
}
}
pub fn republish_branches(&mut self) {
if self.direction == SyncDirection::ReadOnly {
return;
}
let bids: Vec<Id> = match self.store.branches() {
Ok(it) => it.filter_map(|r| r.ok()).collect(),
Err(_) => return,
};
for bid in bids {
if crate::tracking::is_tracking_branch(&mut self.store, bid) {
continue;
}
if let Ok(Some(head)) = self.store.head(bid) {
let bid_bytes: [u8; 16] = bid.into();
self.sender.gossip(bid_bytes, head.raw);
self.last_branches.insert(bid, head.raw);
}
}
if let Some(snap) = StoreSnapshot::from_store(&mut self.store) {
self.sender.update_snapshot(snap);
}
}
pub fn store(&self) -> &S {
&self.store
}
pub fn store_mut(&mut self) -> &mut S {
&mut self.store
}
pub fn into_store(self) -> S {
self.store
}
}
impl<S> BlobStorePut for Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
type PutError = S::PutError;
fn put<Sch, T>(&mut self, item: T) -> Result<Inline<Handle<Sch>>, Self::PutError>
where
Sch: BlobEncoding + 'static,
T: IntoBlob<Sch>,
Handle<Sch>: InlineEncoding,
{
let handle = self.store.put(item)?;
if self.direction != SyncDirection::ReadOnly {
self.sender.announce(handle.raw);
}
self.last_blob_reader = self.store.reader().ok();
Ok(handle)
}
}
impl<S> BlobStore for Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
type Reader = S::Reader;
type ReaderError = S::ReaderError;
fn reader(&mut self) -> Result<Self::Reader, Self::ReaderError> {
self.refresh();
self.store.reader()
}
}
impl<S> BranchStore for Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
type BranchesError = S::BranchesError;
type HeadError = S::HeadError;
type UpdateError = S::UpdateError;
type ListIter<'a> = S::ListIter<'a> where S: 'a;
fn branches<'a>(&'a mut self) -> Result<Self::ListIter<'a>, Self::BranchesError> {
self.refresh();
self.store.branches()
}
fn head(
&mut self,
id: Id,
) -> Result<Option<Inline<Handle<SimpleArchive>>>, Self::HeadError> {
self.refresh();
self.store.head(id)
}
fn update(
&mut self,
id: Id,
old: Option<Inline<Handle<SimpleArchive>>>,
new: Option<Inline<Handle<SimpleArchive>>>,
) -> Result<PushResult, Self::UpdateError> {
let result = self.store.update(id, old, new.clone())?;
if let PushResult::Success() = &result {
if let Some(head) = new {
if !crate::tracking::is_tracking_branch(&mut self.store, id)
&& self.direction != SyncDirection::ReadOnly
{
let bid_bytes: [u8; 16] = id.into();
self.sender.gossip(bid_bytes, head.raw);
self.last_branches.insert(id, head.raw);
}
if let Some(snap) = StoreSnapshot::from_store(&mut self.store) {
self.sender.update_snapshot(snap);
}
}
}
Ok(result)
}
}
fn read_remote_name<S: BlobStore>(store: &mut S, head_hash: &RawHash) -> Option<String> {
use triblespace_core::blob::encodings::longstring::LongString;
use triblespace_core::repo::BlobStoreGet;
use triblespace_core::macros::{find, pattern};
let reader = store.reader().ok()?;
let meta_handle = Inline::<Handle<SimpleArchive>>::new(*head_hash);
let meta: triblespace_core::trible::TribleSet = reader.get(meta_handle).ok()?;
let name_handle: Inline<Handle<LongString>> = find!(
h: Inline<Handle<LongString>>,
pattern!(&meta, [{ _?e @ triblespace_core::metadata::name: ?h }])
)
.next()
.or_else(|| {
find!(
h: Inline<Handle<LongString>>,
pattern!(&meta, [{ _?e @ crate::tracking::remote_name: ?h }])
)
.next()
})?;
let name_view: anybytes::View<str> = reader.get(name_handle).ok()?;
Some(name_view.as_ref().to_string())
}