use std::sync::Arc;
use std::time::Duration;
use p2panda_discovery::random_walk::{RandomWalker, RandomWalkerConfig};
use p2panda_discovery::{DiscoveryResult, DiscoveryStrategy};
use p2panda_store::SqliteStore;
use ractor::thread_local::ThreadLocalActor;
use ractor::{ActorProcessingErr, ActorRef, cast};
use rand_chacha::ChaCha20Rng;
use tokio::sync::Notify;
use tokio::time;
use tracing::trace;
use crate::NodeId;
use crate::addrs::NodeInfo;
use crate::discovery::DiscoveryConfig;
use crate::discovery::actors::ToDiscoveryManager;
use crate::discovery::backoff::{Backoff, Config as BackoffConfig};
const NO_RESULTS_DELAY: Duration = Duration::from_secs(2);
const SUCCESS_RATE_THRESHOLD: SuccessRate = 0.02;
pub type SuccessRate = f32;
pub enum WalkFromHere {
Bootstrap,
LastSession {
discovery_result: DiscoveryResult<NodeId, NodeInfo>,
newly_learned_transport_infos: usize,
},
FailedSession {
last_successful: Option<DiscoveryResult<NodeId, NodeInfo>>,
},
}
impl WalkFromHere {
pub fn success_rate(&self) -> SuccessRate {
match self {
WalkFromHere::Bootstrap => 1.0,
WalkFromHere::LastSession {
discovery_result,
newly_learned_transport_infos,
} => {
*newly_learned_transport_infos as f32
/ discovery_result.transport_infos.len() as f32
}
WalkFromHere::FailedSession { .. } => 0.0,
}
}
pub fn next_node_args(&self) -> Option<&DiscoveryResult<NodeId, NodeInfo>> {
match self {
WalkFromHere::Bootstrap => None,
WalkFromHere::LastSession {
discovery_result, ..
} => Some(discovery_result),
WalkFromHere::FailedSession { last_successful } => last_successful.as_ref(),
}
}
}
pub enum ToDiscoveryWalker {
NextNode(WalkFromHere),
}
pub struct DiscoveryWalkerState {
manager_ref: ActorRef<ToDiscoveryManager>,
walker: RandomWalker<ChaCha20Rng, SqliteStore, NodeId, NodeInfo>,
backoff: Backoff,
walker_reset: Arc<Notify>,
}
#[derive(Default)]
pub struct DiscoveryWalker;
impl ThreadLocalActor for DiscoveryWalker {
type State = DiscoveryWalkerState;
type Msg = ToDiscoveryWalker;
type Arguments = (
NodeId,
DiscoveryConfig,
SqliteStore,
ChaCha20Rng,
Arc<Notify>,
ActorRef<ToDiscoveryManager>,
);
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (my_node_id, config, store, rng, walker_reset, manager_ref) = args;
Ok(DiscoveryWalkerState {
manager_ref,
walker: RandomWalker::from_config(
my_node_id,
store,
rng.clone(),
RandomWalkerConfig {
reset_walk_probability: config.reset_walk_probability,
},
),
backoff: Backoff::new(BackoffConfig::default(), rng),
walker_reset,
})
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToDiscoveryWalker::NextNode(mut walk_from_here) => {
if walk_from_here.success_rate() < SUCCESS_RATE_THRESHOLD {
state.backoff.increment();
} else {
state.backoff.reset();
}
tokio::select! {
_ = state.walker_reset.notified() => {
trace!("received notification to reset walker and backoff");
walk_from_here = WalkFromHere::Bootstrap;
state.backoff.reset();
}
_ = state.backoff.sleep() => (),
}
let node_id = state
.walker
.next_node(walk_from_here.next_node_args())
.await?;
match node_id {
Some(node_id) => {
if cast!(
state.manager_ref,
ToDiscoveryManager::InitiateSession(node_id, myself)
)
.is_err()
{
trace!("parent actor not available, probably winding down");
}
}
None => {
time::sleep(NO_RESULTS_DELAY).await;
let _ = myself
.send_message(ToDiscoveryWalker::NextNode(WalkFromHere::Bootstrap));
}
}
}
}
Ok(())
}
}