use std::collections::HashMap;
use anybytes::Bytes;
use ed25519_dalek::SigningKey;
use iroh_base::EndpointId;
use triblespace_core::blob::{BlobEncoding, IntoBlob};
use triblespace_core::blob::encodings::UnknownBlob;
use triblespace_core::blob::encodings::simplearchive::SimpleArchive;
use triblespace_core::id::Id;
use triblespace_core::repo::{
BlobStore, BlobStoreList, BlobStorePut, BranchStore, PushResult,
};
use triblespace_core::inline::Inline;
use triblespace_core::inline::InlineEncoding;
use triblespace_core::inline::encodings::hash::Handle;
use crate::channel::NetEvent;
use crate::host::{self, NetReceiver, NetSender, StoreSnapshot};
use crate::protocol::{RawBranchId, RawHash};
pub use crate::host::PeerConfig;
pub struct Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
store: S,
sender: NetSender,
receiver: NetReceiver,
last_blob_reader: Option<S::Reader>,
last_branches: HashMap<Id, RawHash>,
}
impl<S> Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
pub fn new(mut store: S, key: SigningKey, config: PeerConfig) -> Self {
let (sender, receiver) = host::spawn(key, config);
if let Some(snap) = StoreSnapshot::from_store(&mut store) {
sender.update_snapshot(snap);
}
let last_blob_reader = store.reader().ok();
Peer {
store,
sender,
receiver,
last_blob_reader,
last_branches: HashMap::new(),
}
}
pub fn id(&self) -> EndpointId {
self.sender.id()
}
pub fn track(&self, peer: EndpointId, branch: RawBranchId) {
self.sender.track(peer, branch);
}
pub fn pull_branch(
&mut self,
remote: EndpointId,
name: &str,
) -> anyhow::Result<Id> {
let (remote_id, _head) = resolve_branch_name(self, remote, name)?
.ok_or_else(|| anyhow::anyhow!("branch '{name}' not found on remote"))?;
let branch_bytes: [u8; 16] = remote_id.into();
self.track(remote, branch_bytes);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
if let Some(id) = crate::tracking::find_tracking_branch(self, remote_id) {
return Ok(id);
}
if std::time::Instant::now() > deadline {
return Err(anyhow::anyhow!("timed out waiting for remote HEAD"));
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
pub fn list_remote_branches(
&self,
peer: EndpointId,
) -> anyhow::Result<Vec<(Id, RawHash)>> {
self.sender.list_remote_branches(peer)
}
pub fn head_of_remote(
&mut self,
peer: EndpointId,
branch: RawBranchId,
) -> anyhow::Result<Option<RawHash>> {
self.sender.head_of_remote(peer, branch)
}
pub fn fetch<T, Sch>(
&mut self,
peer: EndpointId,
handle: Inline<Handle<Sch>>,
) -> anyhow::Result<Option<T>>
where
Sch: BlobEncoding + 'static,
T: triblespace_core::blob::TryFromBlob<Sch>,
Handle<Sch>: InlineEncoding,
{
let Some(bytes) = self.sender.fetch(peer, handle.raw)? else {
return Ok(None);
};
let data: Bytes = bytes.into();
self.store
.put::<UnknownBlob, Bytes>(data.clone())
.map_err(|_| anyhow::anyhow!("store put failed"))?;
self.last_blob_reader = self.store.reader().ok();
let blob: triblespace_core::blob::Blob<Sch> =
triblespace_core::blob::Blob::new(data);
T::try_from_blob(blob)
.map(Some)
.map_err(|_| anyhow::anyhow!("blob decode failed"))
}
pub fn refresh(&mut self) {
while let Some(event) = self.receiver.try_recv() {
match event {
NetEvent::Blob(data) => {
let bytes: Bytes = data.into();
let _ = self.store.put::<UnknownBlob, Bytes>(bytes);
}
NetEvent::Head { branch, head, publisher } => {
if let Some(remote_id) = Id::new(branch) {
if let Some(name) = read_remote_name(&mut self.store, &head) {
crate::tracking::ensure_tracking_branch(
&mut self.store,
remote_id,
&head,
&name,
&publisher,
);
}
}
}
}
}
if let Ok(current) = self.store.reader() {
if let Some(baseline) = self.last_blob_reader.as_ref() {
for handle in current.blobs_diff(baseline).flatten() {
self.sender.announce(handle.raw);
}
}
self.last_blob_reader = Some(current);
}
let bids: Vec<Id> = match self.store.branches() {
Ok(it) => it.filter_map(|r| r.ok()).collect(),
Err(_) => return,
};
for bid in bids {
if crate::tracking::is_tracking_branch(&mut self.store, bid) {
continue;
}
let head = match self.store.head(bid) {
Ok(Some(h)) => h,
_ => continue,
};
if self.last_branches.get(&bid) != Some(&head.raw) {
let bid_bytes: [u8; 16] = bid.into();
self.sender.gossip(bid_bytes, head.raw);
self.last_branches.insert(bid, head.raw);
}
}
if let Some(snap) = StoreSnapshot::from_store(&mut self.store) {
self.sender.update_snapshot(snap);
}
}
pub fn republish_branches(&mut self) {
let bids: Vec<Id> = match self.store.branches() {
Ok(it) => it.filter_map(|r| r.ok()).collect(),
Err(_) => return,
};
for bid in bids {
if crate::tracking::is_tracking_branch(&mut self.store, bid) {
continue;
}
if let Ok(Some(head)) = self.store.head(bid) {
let bid_bytes: [u8; 16] = bid.into();
self.sender.gossip(bid_bytes, head.raw);
self.last_branches.insert(bid, head.raw);
}
}
if let Some(snap) = StoreSnapshot::from_store(&mut self.store) {
self.sender.update_snapshot(snap);
}
}
pub fn store(&self) -> &S {
&self.store
}
pub fn store_mut(&mut self) -> &mut S {
&mut self.store
}
pub fn into_store(self) -> S {
self.store
}
}
impl<S> BlobStorePut for Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
type PutError = S::PutError;
fn put<Sch, T>(&mut self, item: T) -> Result<Inline<Handle<Sch>>, Self::PutError>
where
Sch: BlobEncoding + 'static,
T: IntoBlob<Sch>,
Handle<Sch>: InlineEncoding,
{
let handle = self.store.put(item)?;
self.sender.announce(handle.raw);
self.last_blob_reader = self.store.reader().ok();
Ok(handle)
}
}
impl<S> BlobStore for Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
type Reader = S::Reader;
type ReaderError = S::ReaderError;
fn reader(&mut self) -> Result<Self::Reader, Self::ReaderError> {
self.refresh();
self.store.reader()
}
}
impl<S> BranchStore for Peer<S>
where
S: BlobStore + BlobStorePut + BranchStore,
{
type BranchesError = S::BranchesError;
type HeadError = S::HeadError;
type UpdateError = S::UpdateError;
type ListIter<'a> = S::ListIter<'a> where S: 'a;
fn branches<'a>(&'a mut self) -> Result<Self::ListIter<'a>, Self::BranchesError> {
self.refresh();
self.store.branches()
}
fn head(
&mut self,
id: Id,
) -> Result<Option<Inline<Handle<SimpleArchive>>>, Self::HeadError> {
self.refresh();
self.store.head(id)
}
fn update(
&mut self,
id: Id,
old: Option<Inline<Handle<SimpleArchive>>>,
new: Option<Inline<Handle<SimpleArchive>>>,
) -> Result<PushResult, Self::UpdateError> {
let result = self.store.update(id, old, new.clone())?;
if let PushResult::Success() = &result {
if let Some(head) = new {
if !crate::tracking::is_tracking_branch(&mut self.store, id) {
let bid_bytes: [u8; 16] = id.into();
self.sender.gossip(bid_bytes, head.raw);
self.last_branches.insert(id, head.raw);
}
if let Some(snap) = StoreSnapshot::from_store(&mut self.store) {
self.sender.update_snapshot(snap);
}
}
}
Ok(result)
}
}
pub fn resolve_branch_name<S>(
peer: &mut Peer<S>,
remote: EndpointId,
name: &str,
) -> anyhow::Result<Option<(Id, RawHash)>>
where
S: BlobStore + BlobStorePut + BranchStore,
{
use triblespace_core::blob::encodings::longstring::LongString;
use triblespace_core::macros::{find, pattern};
use triblespace_core::trible::TribleSet;
let branches = peer.list_remote_branches(remote)?;
for (id, head) in branches {
let meta_handle = Inline::<Handle<SimpleArchive>>::new(head);
let Some(meta) = peer.fetch::<TribleSet, _>(remote, meta_handle)? else {
continue;
};
let name_handles: Vec<Inline<Handle<LongString>>> = find!(
h: Inline<Handle<LongString>>,
pattern!(&meta, [{ _?e @ triblespace_core::metadata::name: ?h }])
)
.collect();
for name_handle in name_handles {
let Some(name_view) = peer.fetch::<anybytes::View<str>, _>(remote, name_handle)? else {
continue;
};
if name_view.as_ref() == name {
return Ok(Some((id, head)));
}
}
}
Ok(None)
}
fn read_remote_name<S: BlobStore>(store: &mut S, head_hash: &RawHash) -> Option<String> {
use triblespace_core::blob::encodings::longstring::LongString;
use triblespace_core::repo::BlobStoreGet;
use triblespace_core::macros::{find, pattern};
let reader = store.reader().ok()?;
let meta_handle = Inline::<Handle<SimpleArchive>>::new(*head_hash);
let meta: triblespace_core::trible::TribleSet = reader.get(meta_handle).ok()?;
let name_handle: Inline<Handle<LongString>> = find!(
h: Inline<Handle<LongString>>,
pattern!(&meta, [{ _?e @ triblespace_core::metadata::name: ?h }])
)
.next()
.or_else(|| {
find!(
h: Inline<Handle<LongString>>,
pattern!(&meta, [{ _?e @ crate::tracking::remote_name: ?h }])
)
.next()
})?;
let name_view: anybytes::View<str> = reader.get(name_handle).ok()?;
Some(name_view.as_ref().to_string())
}