rust-ipfs 0.15.0

IPFS node implementation
Documentation
use clap::Parser;
use futures::{FutureExt, StreamExt};
use rust_ipfs::p2p::MultiaddrExt;
use rust_ipfs::{builder::IpfsBuilder, Ipfs, Keypair, Multiaddr};

use connexa::prelude::{ConnectionEvent, GossipsubEvent};
use pollable_map::stream::StreamMap;
use rustyline_async::Readline;
use std::time::Duration;
use std::{io::Write, sync::Arc};
use tokio::sync::Notify;

#[derive(Debug, Parser)]
#[clap(name = "pubsub")]
struct Opt {
    #[clap(long)]
    bootstrap: bool,
    #[clap(long)]
    use_mdns: bool,
    #[clap(long)]
    use_relay: bool,
    #[clap(long)]
    relay_addrs: Vec<Multiaddr>,
    #[clap(long)]
    use_upnp: bool,
    #[clap(long)]
    topic: Option<String>,
    #[clap(long)]
    stdout_log: bool,
    #[clap(long)]
    connect: Vec<Multiaddr>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let opt = Opt::parse();
    if opt.stdout_log {
        tracing_subscriber::fmt::init();
    }

    let topic = opt.topic.unwrap_or_else(|| String::from("ipfs-chat"));

    let main_topic = Arc::new(tokio::sync::Mutex::new(topic.clone()));

    let keypair = Keypair::generate_ed25519();

    let peer_id = keypair.public().to_peer_id();
    let (mut rl, mut stdout) = Readline::new(format!("{peer_id} >"))?;

    // Initialize the repo and start a daemon
    let mut uninitialized = IpfsBuilder::with_keypair(&keypair)?
        .with_custom_behaviour({
            let stdout = stdout.clone();
            |keypair| {
                Ok(ext_behaviour::Behaviour::new(
                    keypair.public().to_peer_id(),
                    stdout,
                ))
            }
        })
        .with_default()
        .enable_tcp()
        .add_listening_addr("/ip4/0.0.0.0/tcp/0".parse()?);

    if opt.use_mdns {
        uninitialized = uninitialized.with_mdns();
    }

    if opt.use_relay {
        uninitialized = uninitialized.with_relay(true);
    }

    if opt.use_upnp {
        uninitialized = uninitialized.with_upnp();
    }

    let ipfs: Ipfs = uninitialized.start().await?;

    if opt.bootstrap {
        ipfs.default_bootstrap().await?;
        if let Err(_e) = ipfs.bootstrap().await {}
    }

    let cancel = Arc::new(Notify::new());
    if opt.use_relay {
        let bootstrap_nodes = ipfs.get_bootstraps().await.expect("Bootstrap exist");
        let addrs = opt
            .relay_addrs
            .iter()
            .chain(bootstrap_nodes.iter())
            .cloned();

        for mut addr in addrs {
            let peer_id = addr
                .extract_peer_id()
                .expect("Bootstrap to contain peer id");
            ipfs.add_relay(peer_id, addr).await?;
        }

        if let Err(e) = ipfs.enable_relay(None).await {
            writeln!(stdout, "> Error selecting a relay: {e}")?;
        }
    }

    let mut st = ipfs.connection_events().await?;

    let mut listener_st = StreamMap::new();

    ipfs.pubsub_subscribe(topic.clone()).await?;
    let stream = ipfs.pubsub_listener(&topic).await?;

    listener_st.insert(topic.clone(), stream);

    for addr in opt.connect {
        let Some(peer_id) = addr.peer_id() else {
            writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?;
            continue;
        };

        if let Err(e) = ipfs.connect(addr.clone()).await {
            writeln!(stdout, "> Error connecting to {addr}: {e}")?;
            continue;
        }

        writeln!(stdout, "Connected to {}", peer_id)?;
    }

    let owned_topic = topic.to_string();
    tokio::spawn(topic_discovery(ipfs.clone(), owned_topic));

    tokio::task::yield_now().await;

    loop {
        tokio::select! {
            Some((topic, ev)) = listener_st.next() => {
                match ev {
                    GossipsubEvent::Subscribed { peer_id } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
                    GossipsubEvent::Unsubscribed { peer_id } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
                    GossipsubEvent::Message { message } => {
                        writeln!(stdout, "> {topic}: {}: {}", message.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&message.data))?;
                    },
                }
            }
            Some(conn_ev) = st.next() => {
                match conn_ev {
                    ConnectionEvent::ConnectionEstablished { peer_id, .. } => {
                        writeln!(stdout, "> {peer_id} connected")?;
                    }
                    ConnectionEvent::ConnectionClosed{ peer_id, .. } => {
                        writeln!(stdout, "> {peer_id} disconnected")?;
                    }
                }
            }
            line = rl.readline().fuse() => match line {
                Ok(rustyline_async::ReadlineEvent::Line(line)) => {
                    let line = line.trim();
                    if !line.starts_with('/') {
                        if !line.is_empty() {
                            let topic_to_publish = &*main_topic.lock().await;
                            if let Err(e) = ipfs.pubsub_publish(topic_to_publish.clone(), line.as_bytes().to_vec()).await {
                                writeln!(stdout, "> error publishing message: {e}")?;
                                continue;
                            }
                            writeln!(stdout, "{peer_id}: {line}")?;
                        }
                        continue;
                    }

                    let mut command = line.split(' ');

                    match command.next() {
                        Some("/subscribe") => {
                            let topic = match command.next() {
                                Some(topic) => topic.to_string(),
                                None => {
                                    writeln!(stdout, "> topic must be provided")?;
                                    continue;
                                }
                            };
                            let Err(_e) = ipfs.pubsub_subscribe(&topic).await else {
                                writeln!(stdout, "> already subscribed to topic")?;
                                continue;
                            };

                            let event_st = ipfs.pubsub_listener(&topic).await?;


                            listener_st.insert(topic.clone(), event_st);
                            writeln!(stdout, "> subscribed to {}", topic)?;
                            *main_topic.lock().await = topic;
                            continue;
                        }
                        Some("/unsubscribe") => {
                            let topic = match command.next() {
                                Some(topic) => topic.to_string(),
                                None => main_topic.lock().await.clone()
                            };

                            listener_st.remove(&topic);

                            if ipfs.pubsub_unsubscribe(&topic).await.is_err() {
                                writeln!(stdout, "> unable to unsubscribe from {}", topic)?;
                                continue;
                            }

                            writeln!(stdout, "> unsubscribe from {}", topic)?;
                            if let Some(some_topic) = listener_st.keys().next() {
                                *main_topic.lock().await = some_topic.clone();
                                writeln!(stdout, "> setting current topic to {}", some_topic)?;
                            }
                            continue;
                        }
                        Some("/list-topics") => {
                            let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
                            if topics.is_empty() {
                                writeln!(stdout, "> not subscribed to any topics")?;
                                continue;
                            }

                            let current_topic = main_topic.lock().await.clone();

                            writeln!(stdout, "> list of topics")?;
                            for topic in topics {
                                writeln!(stdout, "\t{topic} {}", if current_topic == topic { "- current" } else { "" } )?;
                            }
                        }
                        Some("/set-current-topic") => {
                            let topic = match command.next() {
                                Some(topic) if !topic.is_empty() => topic.to_string(),
                                _ => {
                                    writeln!(stdout, "> topic must be provided")?;
                                    continue;
                                }
                            };

                            let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
                            if topics.is_empty() || !topics.contains(&topic) {
                                writeln!(stdout, "> not subscribed to topic \"{topic}\"")?;
                                continue;
                            }

                            *main_topic.lock().await = topic.clone();

                            writeln!(stdout, "> topic set to {topic}")?;
                        }
                        _ => continue
                    }

                }
                Ok(rustyline_async::ReadlineEvent::Eof) => {
                    cancel.notify_one();
                    break
                },
                Ok(rustyline_async::ReadlineEvent::Interrupted) => {
                    cancel.notify_one();
                    break
                },
                Err(e) => {
                    writeln!(stdout, "Error: {e}")?;
                    writeln!(stdout, "Exiting...")?;
                    break
                },
            }
        }
    }
    // Exit
    ipfs.exit_daemon().await;
    Ok(())
}

//Note: This is temporary as a similar implementation will be used internally in the future
async fn topic_discovery(ipfs: Ipfs, topic: String) -> anyhow::Result<()> {
    let topic_bytes = topic.as_bytes().to_vec();
    ipfs.dht_provide(topic_bytes.clone()).await?;
    loop {
        let mut stream = ipfs.dht_get_providers(topic_bytes.clone()).await?.boxed();
        while let Some(_providers) = stream.next().await {}
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
}

mod ext_behaviour {
    use connexa::dummy::DummyHandler;
    use connexa::prelude::swarm::derive_prelude::PortUse;
    use connexa::prelude::swarm::{
        ConnectionDenied, FromSwarm, NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent,
        ToSwarm,
    };
    use connexa::prelude::transport::Endpoint;
    use rust_ipfs::{ConnectionId, Multiaddr, NetworkBehaviour, PeerId, Protocol};
    use rustyline_async::SharedWriter;
    use std::convert::Infallible;
    use std::{
        collections::HashSet,
        io::Write,
        task::{Context, Poll},
    };

    pub struct Behaviour {
        addrs: HashSet<Multiaddr>,
        stdout: SharedWriter,
        peer_id: PeerId,
    }

    impl Behaviour {
        pub fn new(local_peer_id: PeerId, mut stdout: SharedWriter) -> Self {
            writeln!(stdout, "PeerID: {}", local_peer_id).expect("");
            Self {
                peer_id: local_peer_id,
                addrs: Default::default(),
                stdout,
            }
        }
    }

    impl NetworkBehaviour for Behaviour {
        type ConnectionHandler = DummyHandler;
        type ToSwarm = Infallible;

        fn handle_pending_inbound_connection(
            &mut self,
            _: ConnectionId,
            _: &Multiaddr,
            _: &Multiaddr,
        ) -> Result<(), ConnectionDenied> {
            Ok(())
        }

        fn handle_pending_outbound_connection(
            &mut self,
            _: ConnectionId,
            _: Option<PeerId>,
            _: &[Multiaddr],
            _: Endpoint,
        ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
            Ok(vec![])
        }

        fn handle_established_inbound_connection(
            &mut self,
            _: ConnectionId,
            _: PeerId,
            _: &Multiaddr,
            _: &Multiaddr,
        ) -> Result<THandler<Self>, ConnectionDenied> {
            Ok(DummyHandler)
        }

        fn handle_established_outbound_connection(
            &mut self,
            _: ConnectionId,
            _: PeerId,
            _: &Multiaddr,
            _: Endpoint,
            _: PortUse,
        ) -> Result<THandler<Self>, ConnectionDenied> {
            Ok(DummyHandler)
        }

        fn on_connection_handler_event(
            &mut self,
            _: PeerId,
            _: ConnectionId,
            _: THandlerOutEvent<Self>,
        ) {
        }

        fn on_swarm_event(&mut self, event: FromSwarm) {
            match event {
                FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => {
                    if self.addrs.insert(addr.clone()) {
                        writeln!(
                            self.stdout,
                            "Listening on {}",
                            addr.clone().with(Protocol::P2p(self.peer_id))
                        )
                        .expect("");
                    }
                }
                FromSwarm::ExternalAddrConfirmed(ev) => {
                    if self.addrs.insert(ev.addr.clone()) {
                        writeln!(self.stdout, "Listening on {}", ev.addr).expect("");
                    }
                }
                _ => {}
            }
        }

        fn poll(&mut self, _: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
            Poll::Pending
        }
    }
}