use crate::behaviour::peer_store::store::Store;
use crate::prelude::GossipsubMessage;
use crate::task::ConnexaTask;
use crate::types::{GossipsubCommand, GossipsubEvent};
use futures::channel::{mpsc, oneshot};
use libp2p::gossipsub::Event;
use libp2p::swarm::NetworkBehaviour;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
impl<X, C: NetworkBehaviour, S, T> ConnexaTask<X, C, S, T>
where
X: Default + Send + 'static,
C: Send,
C::ToSwarm: Debug,
S: Store,
{
pub fn process_gossipsub_command(&mut self, command: GossipsubCommand) {
let swarm = self.swarm.as_mut().unwrap();
match command {
GossipsubCommand::Subscribe { topic, resp, .. } => {
let Some(pubsub) = swarm.behaviour_mut().gossipsub.as_mut() else {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
};
let topic = libp2p::gossipsub::IdentTopic::new(topic.into_string());
match pubsub.subscribe(&topic) {
Ok(true) => {
let _ = resp.send(Ok(()));
}
Ok(false) => {
let _ = resp.send(Err(std::io::Error::other("topic already subscribed")));
}
Err(e) => {
let _ = resp.send(Err(std::io::Error::other(e)));
}
}
}
GossipsubCommand::Unsubscribe { topic, resp, .. } => {
let Some(pubsub) = swarm.behaviour_mut().gossipsub.as_mut() else {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
};
let topic = libp2p::gossipsub::IdentTopic::new(topic.into_string());
match pubsub.unsubscribe(&topic) {
true => {
let _ = resp.send(Ok(()));
}
false => {
let _ = resp.send(Err(std::io::Error::other("not subscribed to topic")));
}
}
}
GossipsubCommand::Peers { topic, resp, .. } => {
let Some(pubsub) = swarm.behaviour_mut().gossipsub.as_mut() else {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
};
let peers = pubsub
.all_peers()
.filter_map(|(peer_id, list)| list.contains(&&topic).then_some(*peer_id))
.collect();
let _ = resp.send(Ok(peers));
}
GossipsubCommand::Subscribed { resp, .. } => {
let Some(pubsub) = swarm.behaviour_mut().gossipsub.as_mut() else {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
};
let topics = pubsub.topics().cloned().collect();
let _ = resp.send(Ok(topics));
}
GossipsubCommand::Publish { topic, data, resp } => {
let Some(pubsub) = swarm.behaviour_mut().gossipsub.as_mut() else {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
};
let ret = match pubsub.publish(topic, data) {
Ok(_) => Ok(()),
Err(e) => Err(std::io::Error::other(e)),
};
let _ = resp.send(ret);
}
GossipsubCommand::GossipsubListener { topic, resp } => {
if !swarm.behaviour_mut().gossipsub.is_enabled() {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
}
let (tx, rx) = mpsc::channel(50);
self.gossipsub_listener.entry(topic).or_default().push(tx);
let _ = resp.send(Ok(rx));
}
GossipsubCommand::ReportMessage {
peer_id,
message_id,
accept,
resp,
} => {
let Some(pubsub) = swarm.behaviour_mut().gossipsub.as_mut() else {
let _ = resp.send(Err(std::io::Error::other("gossipsub is not enabled")));
return;
};
let is_stored_in_cache =
pubsub.report_message_validation_result(&message_id, &peer_id, accept);
let _ = resp.send(Ok(is_stored_in_cache));
}
}
}
pub fn process_gossipsub_event(&mut self, event: Event) {
let (topic, event) = match event {
Event::Message {
propagation_source,
message,
message_id,
} => {
let topic = message.topic;
let message = GossipsubMessage {
message_id,
propagated_source: propagation_source,
source: message.source,
sequence_number: message.sequence_number,
data: message.data.into(),
};
let event = GossipsubEvent::Message { message };
(topic, event)
}
Event::Subscribed { peer_id, topic } => {
let event = GossipsubEvent::Subscribed { peer_id };
(topic, event)
}
Event::Unsubscribed { peer_id, topic } => {
let event = GossipsubEvent::Unsubscribed { peer_id };
(topic, event)
}
Event::GossipsubNotSupported { peer_id } => {
tracing::info!(%peer_id, "peer does not support gossipsub");
return;
}
Event::SlowPeer {
peer_id,
failed_messages,
} => {
tracing::info!(%peer_id, ?failed_messages, "peer is slow");
return;
}
};
let Some(chs) = self.gossipsub_listener.get_mut(&topic) else {
return;
};
for ch in chs {
let _ = ch.try_send(event.clone());
}
}
}