#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use automerge::Automerge;
use futures::{FutureExt, Stream, StreamExt, channel::oneshot, stream::FuturesUnordered};
use rand::SeedableRng;
pub use samod_core::{
AutomergeUrl, BackoffConfig, ConnectionId, DialerId, DocumentId, ListenerId, PeerId, StorageId,
network::ConnDirection,
};
use samod_core::{
CommandId, CommandResult, DocumentActorId, LoaderState, UnixTimestamp,
actors::{
DocToHubMsg,
document::{DocumentActor, SpawnArgs},
hub::{DispatchedCommand, Hub, HubEvent, HubResults, io::HubIoAction},
},
io::{IoResult, IoTask},
network::{ConnectionEvent, ConnectionOwner, DialerConfig, ListenerConfig},
};
use tracing::Instrument;
pub use url::Url;
mod actor_task;
use actor_task::ActorTask;
mod actor_handle;
use actor_handle::ActorHandle;
mod announce_policy;
mod builder;
pub use builder::RepoBuilder;
mod acceptor_handle;
pub use acceptor_handle::{AcceptorEvent, AcceptorHandle};
mod conn_finished_reason;
mod connection;
mod dialer_handle;
pub use conn_finished_reason::ConnFinishedReason;
pub use connection::Connection;
pub use dialer_handle::{DialerEvent, DialerFailed, DialerHandle};
mod dialer;
pub use dialer::Dialer;
mod doc_actor_inner;
mod doc_handle;
mod doc_runner;
mod io_loop;
mod observer;
pub use doc_handle::DocHandle;
pub use observer::{RepoEvent, RepoObserver, StorageOperation};
mod peer_connection_info;
mod peer_info;
pub use peer_connection_info::{ConnectionInfo, ConnectionState, PeerDocState};
pub use peer_info::PeerInfo;
mod stopped;
pub use stopped::Stopped;
pub mod storage;
pub mod transport;
pub use crate::announce_policy::{
AlwaysAnnounce, AnnouncePolicy, LocalAnnouncePolicy, NeverAnnounce,
};
pub use crate::builder::ConcurrencyConfig;
use crate::{
connection::ConnectionHandle,
doc_actor_inner::DocActorInner,
doc_runner::{DocRunner, SpawnedActor},
io_loop::{DriveConnectionTask, IoLoopTask},
storage::Storage,
unbounded::{UnboundedReceiver, UnboundedSender},
};
use crate::{
runtime::{LocalRuntimeHandle, RuntimeHandle},
storage::{InMemoryStorage, LocalStorage},
};
pub use transport::Transport;
pub mod runtime;
#[cfg(feature = "tokio")]
pub mod tokio_io;
mod unbounded;
pub mod websocket;
#[derive(Clone)]
pub struct Repo {
inner: Arc<Mutex<Inner>>,
}
impl std::fmt::Debug for Repo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Repo").finish()
}
}
impl Repo {
pub fn builder<R: runtime::RuntimeHandle>(
runtime: R,
) -> RepoBuilder<InMemoryStorage, R, AlwaysAnnounce> {
builder::RepoBuilder::new(runtime)
}
#[cfg(feature = "tokio")]
pub fn build_tokio() -> RepoBuilder<InMemoryStorage, ::tokio::runtime::Handle, AlwaysAnnounce> {
builder::RepoBuilder::new(::tokio::runtime::Handle::current())
}
pub fn build_localpool(
spawner: futures::executor::LocalSpawner,
) -> RepoBuilder<InMemoryStorage, futures::executor::LocalSpawner, AlwaysAnnounce> {
builder::RepoBuilder::new(spawner)
}
#[cfg(feature = "gio")]
pub fn build_gio()
-> RepoBuilder<InMemoryStorage, crate::runtime::gio::GioRuntime, AlwaysAnnounce> {
builder::RepoBuilder::new(crate::runtime::gio::GioRuntime::new())
}
pub(crate) async fn load<
R: runtime::RuntimeHandle + Clone + Send,
S: Storage,
A: AnnouncePolicy,
>(
builder: RepoBuilder<S, R, A>,
) -> Self {
let RepoBuilder {
storage,
runtime,
peer_id,
announce_policy,
concurrency,
observer,
} = builder;
let task_setup = TaskSetup::new(storage.clone(), peer_id, concurrency, observer).await;
let inner = task_setup.inner.clone();
task_setup.spawn_tasks(runtime, storage, announce_policy);
Self { inner }
}
pub(crate) async fn load_local<
'a,
R: runtime::LocalRuntimeHandle + Clone + 'static,
S: LocalStorage + 'a,
A: LocalAnnouncePolicy + 'a,
>(
builder: RepoBuilder<S, R, A>,
) -> Self {
let RepoBuilder {
storage,
runtime,
peer_id,
announce_policy,
concurrency,
observer,
} = builder;
let task_setup = TaskSetup::new(storage.clone(), peer_id, concurrency, observer).await;
let inner = task_setup.inner.clone();
task_setup.spawn_tasks_local(runtime, storage, announce_policy);
Self { inner }
}
pub async fn create(&self, initial_content: Automerge) -> Result<DocHandle, Stopped> {
let (tx, rx) = oneshot::channel();
{
let DispatchedCommand { command_id, event } =
HubEvent::create_document(initial_content);
let mut inner = self.inner.lock().unwrap();
inner.handle_event(event);
inner.pending_commands.insert(command_id, tx);
drop(inner);
}
let inner = self.inner.clone();
match rx.await {
Ok(r) => match r {
CommandResult::CreateDocument {
actor_id,
document_id: _,
} => {
{
let inner = inner.lock().unwrap();
Ok(inner
.actors
.get(&actor_id)
.map(|ActorHandle { doc: handle, .. }| handle.clone())
.expect("actor should exist"))
}
}
other => {
panic!("unexpected command result for create: {other:?}");
}
},
Err(_) => Err(Stopped),
}
}
pub fn find(
&self,
doc_id: DocumentId,
) -> impl Future<Output = Result<Option<DocHandle>, Stopped>> + 'static {
let mut inner = self.inner.lock().unwrap();
let DispatchedCommand { command_id, event } = HubEvent::find_document(doc_id);
let (tx, rx) = oneshot::channel();
inner.pending_commands.insert(command_id, tx);
inner.handle_event(event);
drop(inner);
let inner = self.inner.clone();
async move {
match rx.await {
Ok(r) => match r {
CommandResult::FindDocument { actor_id, found } => {
if found {
let handle = inner
.lock()
.unwrap()
.actors
.get(&actor_id)
.map(|ActorHandle { doc: handle, .. }| handle.clone())
.expect("actor should exist");
Ok(Some(handle))
} else {
Ok(None)
}
}
other => {
panic!("unexpected command result for create: {other:?}");
}
},
Err(_) => Err(Stopped),
}
}
}
pub fn when_connected(
&self,
peer_id: PeerId,
) -> impl Future<Output = Result<Connection, Stopped>> + 'static {
let inner = self.inner.clone();
async move {
let rx = {
let mut inner = inner.lock().unwrap();
for conn_handle in inner.connections.values() {
if conn_handle.info().map(|i| i.peer_id).as_ref() == Some(&peer_id) {
return Ok(Connection::new(conn_handle.clone()));
}
}
let (tx, rx) = oneshot::channel();
inner
.waiting_for_connection
.entry(peer_id)
.or_default()
.push(tx);
rx
};
rx.await.map_err(|_| Stopped)
}
}
pub fn peer_id(&self) -> PeerId {
self.inner.lock().unwrap().hub.peer_id().clone()
}
pub fn connected_peers(
&self,
) -> (
Vec<ConnectionInfo>,
impl Stream<Item = Vec<ConnectionInfo>> + Unpin + 'static,
) {
let (tx_events, rx_events) = unbounded::channel();
let mut inner = self.inner.lock().unwrap();
inner.conn_listeners.push(tx_events);
let now = inner
.hub
.connections()
.clone()
.into_iter()
.map(|c| c.into())
.collect();
(now, rx_events)
}
pub fn dial(
&self,
backoff: BackoffConfig,
dialer: Arc<dyn Dialer>,
) -> Result<DialerHandle, Stopped> {
let mut inner = self.inner.lock().unwrap();
let url = dialer.url();
let config = DialerConfig { url, backoff };
let DispatchedCommand { command_id, event } = HubEvent::add_dialer(config);
let (tx_result, mut rx_result) = oneshot::channel();
inner.pending_commands.insert(command_id, tx_result);
inner.handle_event(event);
let dialer_id = match rx_result.try_recv() {
Ok(Some(CommandResult::AddDialer { dialer_id })) => dialer_id,
Ok(None) => return Err(Stopped), Ok(Some(other)) => panic!("unexpected command result {:?} for add_dialer", other),
Err(_) => return Err(Stopped),
};
inner.dialers.lock().unwrap().insert(dialer_id, dialer);
let handle = DialerHandle::new(dialer_id, self.clone());
inner.dialer_handles.insert(dialer_id, handle.clone());
Ok(handle)
}
pub fn make_acceptor(&self, url: url::Url) -> Result<AcceptorHandle, Stopped> {
let mut inner = self.inner.lock().unwrap();
if let Some(listener_id) = inner.find_listener_for_url(&url)
&& let Some(handle) = inner.acceptor_handles.get(&listener_id)
{
return Ok(handle.clone());
}
let config = ListenerConfig { url };
let DispatchedCommand { command_id, event } = HubEvent::add_listener(config);
let (tx_result, mut rx_result) = oneshot::channel();
inner.pending_commands.insert(command_id, tx_result);
inner.handle_event(event);
let listener_id = match rx_result.try_recv() {
Ok(Some(CommandResult::AddListener { listener_id })) => listener_id,
Ok(None) => return Err(Stopped),
Ok(Some(other)) => panic!("unexpected command result {:?} for add_listener", other),
Err(_) => return Err(Stopped),
};
let handle = AcceptorHandle::new(listener_id, self.clone());
inner.acceptor_handles.insert(listener_id, handle.clone());
Ok(handle)
}
pub(crate) fn remove_dialer_by_id(&self, dialer_id: DialerId) -> Result<(), Stopped> {
let mut inner = self.inner.lock().unwrap();
inner.dialers.lock().unwrap().remove(&dialer_id);
inner.dialer_handles.remove(&dialer_id);
let event = HubEvent::remove_dialer(dialer_id);
inner.handle_event(event);
Ok(())
}
pub(crate) fn remove_listener_by_id(&self, listener_id: ListenerId) -> Result<(), Stopped> {
let mut inner = self.inner.lock().unwrap();
inner.acceptor_handles.remove(&listener_id);
let event = HubEvent::remove_listener(listener_id);
inner.handle_event(event);
Ok(())
}
pub(crate) fn accept_on_listener(
&self,
listener_id: ListenerId,
transport: crate::Transport,
) -> Result<ConnectionHandle, Stopped> {
let mut inner = self.inner.lock().unwrap();
let DispatchedCommand { command_id, event } =
HubEvent::create_listener_connection(listener_id);
let (tx_result, mut rx_result) = oneshot::channel();
inner.pending_commands.insert(command_id, tx_result);
inner.handle_event(event);
let connection_id = match rx_result.try_recv() {
Ok(Some(CommandResult::CreateConnection { connection_id })) => connection_id,
Ok(other) => panic!(
"unexpected command result {:?} for create_listener_connection",
other
),
Err(_) => return Err(Stopped),
};
if let Some(obs) = inner.observer.as_ref() {
obs.observe(&RepoEvent::ConnectionEstablished { connection_id })
}
let conn_handle = inner
.connections
.get(&connection_id)
.expect("connection not found")
.clone();
let loop_task = IoLoopTask::DriveConnection(DriveConnectionTask {
conn_handle: conn_handle.clone(),
stream: transport.stream,
sink: transport.sink,
});
inner.tx_io.unbounded_send(loop_task).map_err(|_| Stopped)?;
Ok(conn_handle)
}
pub fn stop(&self) -> impl Future<Output = ()> + 'static {
let (tx, rx) = oneshot::channel();
{
let mut inner = self.inner.lock().unwrap();
inner.stop_waiters.push(tx);
inner.handle_event(HubEvent::stop());
}
async move {
if rx.await.is_err() {
tracing::warn!("stop signal was dropped");
}
}
}
}
struct Inner {
doc_runner: DocRunner,
actors: HashMap<DocumentActorId, ActorHandle>,
hub: Hub,
pending_commands: HashMap<CommandId, oneshot::Sender<CommandResult>>,
connections: HashMap<ConnectionId, ConnectionHandle>,
conn_listeners: Vec<unbounded::UnboundedSender<Vec<ConnectionInfo>>>,
tx_io: UnboundedSender<io_loop::IoLoopTask>,
tx_to_core: UnboundedSender<(DocumentActorId, DocToHubMsg)>,
waiting_for_connection: HashMap<PeerId, Vec<oneshot::Sender<Connection>>>,
stop_waiters: Vec<oneshot::Sender<()>>,
rng: rand::rngs::StdRng,
dialers: Arc<Mutex<HashMap<DialerId, io_loop::DynDialer>>>,
dialer_handles: HashMap<DialerId, DialerHandle>,
acceptor_handles: HashMap<ListenerId, AcceptorHandle>,
observer: Option<Arc<dyn observer::RepoObserver>>,
}
impl Inner {
fn find_listener_for_url(&self, url: &url::Url) -> Option<ListenerId> {
self.hub.find_listener_for_url(url)
}
fn dispatch_task(&self, actor_id: DocumentActorId, task: ActorTask) {
match &self.doc_runner {
#[cfg(feature = "threadpool")]
DocRunner::Threadpool(threadpool) => {
let Some(actor_handle) = self.actors.get(&actor_id) else {
tracing::warn!(?actor_id, "received task for unknown actor");
return;
};
let inner = actor_handle.inner.clone();
threadpool.spawn(move || {
let mut guard = inner.lock().unwrap();
guard.handle_task(task);
});
}
DocRunner::Async { task_senders, .. } => {
let Some(tx) = task_senders.get(&actor_id) else {
tracing::warn!(?actor_id, "received task for unknown actor");
return;
};
let _ = tx.unbounded_send(task);
}
}
}
#[tracing::instrument(skip(self, event), fields(local_peer_id=%self.hub.peer_id()))]
fn handle_event(&mut self, event: HubEvent) {
if self.hub.is_stopped() {
tracing::trace!("ignoring event on stopped hub");
return;
}
let hub_start = std::time::Instant::now();
let now = UnixTimestamp::now();
let HubResults {
new_tasks,
completed_commands,
spawn_actors,
actor_messages,
stopped,
connection_events,
dial_requests,
dialer_events,
event_type,
connections_count,
documents_count,
} = self.hub.handle_event(&mut self.rng, now, event);
for spawn_args in spawn_actors {
self.spawn_actor(spawn_args);
}
for (command_id, command) in completed_commands {
if let CommandResult::Receive { .. } = &command {
continue;
}
if let Some(tx) = self.pending_commands.remove(&command_id) {
if let CommandResult::CreateConnection { connection_id } = &command {
self.connections
.insert(*connection_id, ConnectionHandle::new(*connection_id));
}
let _ = tx.send(command);
} else {
tracing::warn!("Received result for unknown command: {:?}", command_id);
}
}
for task in new_tasks {
match task.action {
HubIoAction::Send { connection_id, msg } => {
if let Some(connhandle) = self.connections.get(&connection_id) {
connhandle.send(msg);
} else {
tracing::warn!(
"Tried to send message on unknown connection: {:?}",
connection_id
);
}
}
HubIoAction::Disconnect { connection_id } => {
if self.connections.remove(&connection_id).is_none() {
tracing::warn!(
"Tried to disconnect unknown connection: {:?}",
connection_id
);
}
}
}
}
for (actor_id, actor_msg) in actor_messages {
self.dispatch_task(actor_id, ActorTask::HandleMessage(actor_msg));
}
if !connection_events.is_empty() && !self.conn_listeners.is_empty() {
let new_infos = self.hub.connections();
self.conn_listeners.retain(|tx| {
tx.unbounded_send(new_infos.clone().into_iter().map(|c| c.into()).collect())
.is_ok()
});
}
for evt in connection_events {
match evt {
ConnectionEvent::HandshakeCompleted {
connection_id,
owner,
peer_info,
} => {
if let Some(conn_handle) = self.connections.get(&connection_id) {
let samod_peer_info: PeerInfo = peer_info.clone().into();
conn_handle.notify_handshake_complete(samod_peer_info.clone());
if let Some(tx) = self.waiting_for_connection.get_mut(&peer_info.peer_id) {
for tx in tx.drain(..) {
let _ = tx.send(Connection::new(conn_handle.clone()));
}
}
match owner {
ConnectionOwner::Dialer(dialer_id) => {
if let Some(dh) = self.dialer_handles.get(&dialer_id) {
dh.notify_connected(samod_peer_info, connection_id);
}
}
ConnectionOwner::Listener(listener_id) => {
if let Some(ah) = self.acceptor_handles.get(&listener_id) {
ah.notify_client_connected(
samod_peer_info.clone(),
connection_id,
);
}
conn_handle.notify_client_connected(samod_peer_info);
}
}
}
}
ConnectionEvent::ConnectionFailed {
connection_id,
owner,
error,
} => {
tracing::error!(
?connection_id,
?error,
"connection failed, notifying waiting tasks",
);
if let Some(ref obs) = self.observer {
obs.observe(&observer::RepoEvent::ConnectionLost { connection_id });
}
match owner {
ConnectionOwner::Dialer(dialer_id) => {
if let Some(dh) = self.dialer_handles.get(&dialer_id) {
dh.notify_disconnected(error.clone());
}
}
ConnectionOwner::Listener(listener_id) => {
if let Some(ah) = self.acceptor_handles.get(&listener_id) {
ah.notify_client_disconnected(
connection_id,
ConnFinishedReason::ErrorReceiving(error.clone()),
);
if let Some(conn_handle) = self.connections.get(&connection_id) {
conn_handle.notify_client_disconnected(
ConnFinishedReason::ErrorReceiving(error.clone()),
);
};
}
}
}
self.connections.remove(&connection_id);
}
ConnectionEvent::StateChanged { .. } => {
}
}
}
for request in dial_requests {
tracing::debug!(
dialer_id = ?request.dialer_id,
url = %request.url,
"dispatching dial request to IO loop"
);
if let Some(dh) = self.dialer_handles.get(&request.dialer_id) {
if let Some(attempt) = self.hub.dialer_attempt(request.dialer_id)
&& attempt > 0
{
dh.notify_reconnecting(attempt);
}
}
let task = IoLoopTask::EstablishTransport {
dialer_id: request.dialer_id,
url: request.url,
};
if self.tx_io.unbounded_send(task).is_err() {
tracing::error!("IO loop channel closed, cannot dispatch dial request");
}
}
for event in dialer_events {
match &event {
samod_core::DialerEvent::MaxRetriesReached { dialer_id, url } => {
tracing::warn!(
?dialer_id,
%url,
"dialer exhausted retry budget"
);
if let Some(dh) = self.dialer_handles.get(dialer_id) {
dh.notify_max_retries_reached();
}
}
}
}
if let Some(ref obs) = self.observer {
obs.observe(&observer::RepoEvent::HubEventProcessed {
duration: hub_start.elapsed(),
event_type,
connections: connections_count,
documents: documents_count,
});
}
if stopped {
for waiter in self.stop_waiters.drain(..) {
let _ = waiter.send(());
}
}
}
#[tracing::instrument(skip(self, args))]
fn spawn_actor(&mut self, args: SpawnArgs) {
let actor_id = args.actor_id();
let doc_id = args.document_id().clone();
let (actor, init_results) = DocumentActor::new(UnixTimestamp::now(), args);
if let Some(ref obs) = self.observer {
obs.observe(&observer::RepoEvent::DocumentOpened {
document_id: doc_id.clone(),
});
}
let doc_inner = Arc::new(Mutex::new(DocActorInner::new(
doc_id.clone(),
actor_id,
actor,
self.tx_to_core.clone(),
self.tx_io.clone(),
self.observer.clone(),
)));
let handle = DocHandle::new(doc_id.clone(), doc_inner.clone());
self.actors.insert(
actor_id,
ActorHandle {
inner: doc_inner.clone(),
doc: handle,
},
);
match &mut self.doc_runner {
#[cfg(feature = "threadpool")]
DocRunner::Threadpool(_threadpool) => {
doc_inner.lock().unwrap().handle_results(init_results);
}
DocRunner::Async {
tx_spawn,
task_senders,
} => {
let (tx, rx) = unbounded::channel();
task_senders.insert(actor_id, tx);
if tx_spawn
.unbounded_send(SpawnedActor {
doc_id,
actor_id,
inner: doc_inner,
rx_tasks: rx,
init_results,
})
.is_err()
{
tracing::error!(?actor_id, "actor spawner is gone");
}
}
}
}
}
async fn async_actor_runner(rx: UnboundedReceiver<SpawnedActor>) {
let mut running_actors = FuturesUnordered::new();
loop {
futures::select! {
spawn_actor = rx.recv().fuse() => {
match spawn_actor {
Err(_e) => {
tracing::trace!("actor spawner task finished");
break;
}
Ok(SpawnedActor { inner, rx_tasks, init_results, doc_id, actor_id }) => {
running_actors.push(async move {
inner.lock().unwrap().handle_results(init_results);
while let Ok(actor_task) = rx_tasks.recv().await {
let mut inner = inner.lock().unwrap();
inner.handle_task(actor_task);
if inner.is_stopped() {
tracing::debug!(?doc_id, ?actor_id, "actor stopped");
break;
}
}
});
}
}
},
_ = running_actors.next() => {
}
}
}
while running_actors.next().await.is_some() {
}
}
struct TaskSetup {
peer_id: PeerId,
inner: Arc<Mutex<Inner>>,
tx_io: UnboundedSender<IoLoopTask>,
rx_storage: UnboundedReceiver<IoLoopTask>,
rx_from_core: UnboundedReceiver<(DocumentActorId, DocToHubMsg)>,
rx_actor: Option<UnboundedReceiver<SpawnedActor>>,
dialers: Arc<Mutex<HashMap<DialerId, io_loop::DynDialer>>>,
observer: Option<Arc<dyn observer::RepoObserver>>,
}
impl TaskSetup {
async fn new<S: LocalStorage>(
storage: S,
peer_id: Option<PeerId>,
concurrency: ConcurrencyConfig,
observer: Option<Arc<dyn observer::RepoObserver>>,
) -> TaskSetup {
let mut rng = rand::rngs::StdRng::from_rng(&mut rand::rng());
let peer_id = peer_id.unwrap_or_else(|| PeerId::new_with_rng(&mut rng));
let hub = load_hub(storage.clone(), Hub::load(peer_id.clone())).await;
let (tx_storage, rx_storage) = unbounded::channel();
let tx_io_for_tick = tx_storage.clone();
let (tx_to_core, rx_from_core) = unbounded::channel();
let rx_actor: Option<UnboundedReceiver<SpawnedActor>>;
let doc_runner = match concurrency {
#[cfg(feature = "threadpool")]
ConcurrencyConfig::Threadpool(threadpool) => {
rx_actor = None;
DocRunner::Threadpool(threadpool)
}
ConcurrencyConfig::AsyncRuntime => {
let (tx, rx) = unbounded::channel();
rx_actor = Some(rx);
DocRunner::Async {
tx_spawn: tx,
task_senders: HashMap::new(),
}
}
};
let dialers = Arc::new(Mutex::new(HashMap::new()));
let inner = Arc::new(Mutex::new(Inner {
doc_runner,
actors: HashMap::new(),
hub: *hub,
pending_commands: HashMap::new(),
connections: HashMap::new(),
conn_listeners: Vec::new(),
tx_io: tx_storage,
tx_to_core,
waiting_for_connection: HashMap::new(),
stop_waiters: Vec::new(),
rng: rand::rngs::StdRng::from_os_rng(),
dialers: dialers.clone(),
dialer_handles: HashMap::new(),
acceptor_handles: HashMap::new(),
observer: observer.clone(),
}));
TaskSetup {
peer_id,
inner,
tx_io: tx_io_for_tick,
rx_actor,
rx_from_core,
rx_storage,
dialers,
observer,
}
}
fn spawn_tasks_local<
R: LocalRuntimeHandle + Clone + 'static,
S: LocalStorage,
A: LocalAnnouncePolicy,
>(
self,
runtime: R,
storage: S,
announce_policy: A,
) {
runtime.spawn(
io_loop::io_loop(
self.peer_id.clone(),
self.inner.clone(),
storage,
announce_policy,
self.rx_storage,
self.dialers.clone(),
self.observer.clone(),
)
.boxed_local(),
);
runtime.spawn({
let peer_id = self.peer_id.clone();
let inner = self.inner.clone();
async move {
let rx = self.rx_from_core;
while let Ok((actor_id, msg)) = rx.recv().await {
let event = HubEvent::actor_message(actor_id, msg);
inner.lock().unwrap().handle_event(event);
}
}
.instrument(tracing::info_span!("actor_loop", local_peer_id=%peer_id))
.boxed_local()
});
{
let tx_io = self.tx_io.clone();
let runtime_for_tick = runtime.clone();
let sleep = move |d| runtime_for_tick.sleep(d);
runtime.spawn(connector_tick_loop(tx_io, sleep).boxed_local());
}
if let Some(rx_actor) = self.rx_actor {
runtime.spawn(async_actor_runner(rx_actor).boxed_local());
}
}
fn spawn_tasks<R: RuntimeHandle + Clone + Send, S: Storage, A: AnnouncePolicy>(
self,
runtime: R,
storage: S,
announce_policy: A,
) {
runtime.spawn(
io_loop::io_loop(
self.peer_id.clone(),
self.inner.clone(),
storage,
announce_policy,
self.rx_storage,
self.dialers.clone(),
self.observer.clone(),
)
.boxed(),
);
runtime.spawn({
let peer_id = self.peer_id.clone();
let inner = self.inner.clone();
async move {
let rx = self.rx_from_core;
while let Ok((actor_id, msg)) = rx.recv().await {
let event = HubEvent::actor_message(actor_id, msg);
inner.lock().unwrap().handle_event(event);
}
}
.instrument(tracing::info_span!("actor_loop", local_peer_id=%peer_id))
.boxed()
});
{
let tx_io = self.tx_io.clone();
let runtime_for_tick = runtime.clone();
let sleep = move |d| runtime_for_tick.sleep(d);
runtime.spawn(connector_tick_loop(tx_io, sleep).boxed());
}
if let Some(rx_actor) = self.rx_actor {
runtime.spawn(async_actor_runner(rx_actor).boxed());
}
}
}
async fn connector_tick_loop<F, Fut>(tx_io: UnboundedSender<IoLoopTask>, sleep: F)
where
F: Fn(std::time::Duration) -> Fut,
Fut: std::future::Future<Output = ()>,
{
loop {
sleep(std::time::Duration::from_millis(100)).await;
if tx_io.unbounded_send(IoLoopTask::Tick).is_err() {
break;
}
}
}
async fn load_hub<S: LocalStorage>(storage: S, mut loading: samod_core::SamodLoader) -> Box<Hub> {
let mut rng = rand::rngs::StdRng::from_os_rng();
let mut running_tasks = FuturesUnordered::new();
loop {
match loading.step(&mut rng, UnixTimestamp::now()) {
LoaderState::NeedIo(items) => {
for IoTask {
task_id,
action: task,
} in items
{
let storage = storage.clone();
running_tasks.push(async move {
let result = io_loop::dispatch_storage_task(task, storage).await;
(task_id, result)
})
}
}
LoaderState::Loaded(hub) => break hub,
}
let (task_id, next_result) = running_tasks.select_next_some().await;
loading.provide_io_result(IoResult {
task_id,
payload: next_result,
});
}
}
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
fn assert_send<S: Send>(_s: PhantomData<S>) {}
#[cfg(feature = "tokio")]
fn assert_send_value<S: Send>(_s: impl Fn() -> S) {}
#[test]
fn make_sure_it_is_send() {
assert_send::<super::storage::InMemoryStorage>(PhantomData);
assert_send::<super::Repo>(PhantomData);
#[cfg(feature = "tokio")]
assert_send_value(|| crate::Repo::build_tokio().load());
}
}