speer-chat 0.1.0

terminal chat app for speer
use crate::app::{AppState, NetLevel, PeerHandle};
use crate::constants::{
    CHAT_PROTO, CHAT_SERVICE_TYPE, CHAT_TYPE_BYE, CHAT_TYPE_HELLO, CHAT_TYPE_MSG,
    HANDSHAKE_TIMEOUT_MS,
};
use crate::ffi::{
    boxed_zeroed, c_char_array_to_string, cstring, identity_raw, tcp_dial, SessionPtr, StreamPtr,
};
use crate::files::handle_file_frame;
use crate::protocol::{chat_frame_decode, is_file_frame, recv_chat_frame, send_chat_frame};
use crate::util::truncate_pid;
use crate::AnyResult;
use speer::sys;
use std::ffi::{c_char, c_void, CStr};
use std::ptr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};

pub fn start_peer(app: Arc<AppState>, initiator: bool, fd: i32, addr: String) {
    let peer = PeerHandle::new(initiator, addr);
    if !app.add_peer(peer.clone()) {
        unsafe { sys::speer_tcp_close(fd) };
        app.emit_error("too many peers");
        return;
    }

    thread::spawn(move || {
        if let Err(err) = peer_worker(app.clone(), peer.clone(), fd) {
            app.emit_error(format!("peer error: {err}"));
        }
        peer.dead.store(true, Ordering::Relaxed);
        peer.info.lock().unwrap().active = false;
    });
}

fn peer_worker(app: Arc<AppState>, peer: PeerHandle, fd: i32) -> AnyResult<()> {
    unsafe {
        sys::speer_tcp_set_io_timeout(fd, HANDSHAKE_TIMEOUT_MS);
    }

    let mut session_box = unsafe { boxed_zeroed::<sys::speer_libp2p_tcp_session_t>() };
    let session = SessionPtr(session_box.as_mut() as *mut _);
    let identity = identity_raw(&app.identity);
    let initiator = peer.info.lock().unwrap().initiator;
    let rc = unsafe {
        if initiator {
            app.netlog(
                NetLevel::Info,
                format!("dial tcp {}", peer.info.lock().unwrap().addr),
            );
            sys::speer_libp2p_tcp_session_init_dialer(session.0, fd, &identity)
        } else {
            app.netlog(
                NetLevel::Info,
                format!("accept tcp {}", peer.info.lock().unwrap().addr),
            );
            sys::speer_libp2p_tcp_session_init_listener(session.0, fd, &identity)
        }
    };
    if rc != 0 {
        return Err("tcp/noise/yamux session init failed".into());
    }

    let remote_pid = unsafe { c_char_array_to_string(&(*session.0).remote_peer_id_b58) };
    if remote_pid == app.identity.peer_id {
        unsafe { sys::speer_libp2p_tcp_session_close(session.0) };
        return Ok(());
    }

    {
        let mut info = peer.info.lock().unwrap();
        info.remote_pid_full = remote_pid.clone();
        info.remote_pid_short = truncate_pid(&remote_pid);
    }
    app.netlog(
        NetLevel::Ok,
        format!("peer id {}", truncate_pid(&remote_pid)),
    );

    let chat_proto = cstring(CHAT_PROTO);
    let mut chat_stream = ptr::null_mut();
    let rc = unsafe {
        if initiator {
            sys::speer_libp2p_tcp_open_protocol_stream(
                session.0,
                chat_proto.as_ptr(),
                &mut chat_stream,
            )
        } else {
            let protocols = [chat_proto.as_ptr()];
            let mut selected = 0usize;
            sys::speer_libp2p_tcp_accept_protocol_stream(
                session.0,
                protocols.as_ptr(),
                protocols.len(),
                &mut selected,
                &mut chat_stream,
                HANDSHAKE_TIMEOUT_MS,
                50,
            )
        }
    };
    if rc != 0 || chat_stream.is_null() {
        unsafe { sys::speer_libp2p_tcp_session_close(session.0) };
        return Err("chat protocol negotiation failed".into());
    }

    let stream = StreamPtr(chat_stream);
    {
        let mut info = peer.info.lock().unwrap();
        info.handshake_done = true;
        info.connected_at = Some(Instant::now());
        info.last_seen = info.connected_at;
    }

    app.emit_join("(unknown)", &truncate_pid(&remote_pid));
    app.netlog(NetLevel::Ok, "opened chat stream");

    let nick = app.nick();
    if let Ok(bytes) = send_chat_frame(session, stream, CHAT_TYPE_HELLO, &nick, "") {
        peer.info.lock().unwrap().bytes_tx += bytes as u64;
        app.netlog(NetLevel::Traffic, format!("tx hello {bytes}B"));
    }

    unsafe {
        sys::speer_tcp_set_io_timeout(fd, 0);
    }

    let reader_app = app.clone();
    let reader_peer = peer.clone();
    let reader = thread::spawn(move || {
        peer_reader(reader_app, reader_peer, session, stream);
    });

    while !app.quit.load(Ordering::Relaxed) && !peer.dead.load(Ordering::Relaxed) {
        while let Some(msg) = peer.dequeue() {
            let nick = app.nick();
            match send_chat_frame(session, stream, msg.kind, &nick, &msg.text) {
                Ok(bytes) => {
                    let mut info = peer.info.lock().unwrap();
                    info.bytes_tx += bytes as u64;
                    info.last_seen = Some(Instant::now());
                    if msg.kind == CHAT_TYPE_MSG {
                        info.msgs_tx += 1;
                    }
                    app.netlog(NetLevel::Traffic, format!("tx frame {bytes}B"));
                }
                Err(_) => {
                    peer.dead.store(true, Ordering::Relaxed);
                    break;
                }
            }
        }
        thread::sleep(Duration::from_millis(50));
    }

    peer.dead.store(true, Ordering::Relaxed);
    unsafe {
        sys::speer_tcp_close((*session.0).fd);
        (*session.0).fd = -1;
    }
    let _ = reader.join();
    unsafe {
        sys::speer_libp2p_tcp_session_close(session.0);
    }
    drop(session_box);

    let nick = {
        let info = peer.info.lock().unwrap();
        if info.remote_nick.is_empty() {
            info.remote_pid_short.clone()
        } else {
            info.remote_nick.clone()
        }
    };
    app.emit_leave(&nick);
    Ok(())
}

fn peer_reader(app: Arc<AppState>, peer: PeerHandle, session: SessionPtr, stream: StreamPtr) {
    while !app.quit.load(Ordering::Relaxed) && !peer.dead.load(Ordering::Relaxed) {
        let (frame, wire_len) = match recv_chat_frame(session, stream) {
            Ok(value) => value,
            Err(_) => break,
        };
        let (kind, nick, text) = match chat_frame_decode(&frame) {
            Ok(value) => value,
            Err(_) => continue,
        };

        {
            let mut info = peer.info.lock().unwrap();
            info.bytes_rx += wire_len as u64;
            info.last_seen = Some(Instant::now());
            if !nick.is_empty() {
                info.remote_nick = nick.clone();
            }
        }

        match kind {
            CHAT_TYPE_MSG => {
                let (pid, display) = {
                    let mut info = peer.info.lock().unwrap();
                    info.msgs_rx += 1;
                    (
                        info.remote_pid_full.clone(),
                        if info.remote_nick.is_empty() {
                            info.remote_pid_short.clone()
                        } else {
                            info.remote_nick.clone()
                        },
                    )
                };
                app.emit_chat(&pid, &display, &text);
                app.netlog(NetLevel::Traffic, format!("rx chat {wire_len}B"));
            }
            CHAT_TYPE_HELLO => {
                let display = {
                    let info = peer.info.lock().unwrap();
                    if info.remote_nick.is_empty() {
                        info.remote_pid_short.clone()
                    } else {
                        info.remote_nick.clone()
                    }
                };
                app.emit_join(
                    &display,
                    &truncate_pid(&peer.info.lock().unwrap().remote_pid_full),
                );
            }
            CHAT_TYPE_BYE => break,
            kind if is_file_frame(kind) => handle_file_frame(&app, &peer, kind, &text),
            _ => {}
        }
    }
    peer.dead.store(true, Ordering::Relaxed);
}

pub fn parse_multiaddr(multiaddr: &str) -> Option<(String, u16)> {
    let ip_marker = "/ip4/";
    let tcp_marker = "/tcp/";
    let ip_start = multiaddr.find(ip_marker)? + ip_marker.len();
    let tcp_start = multiaddr.find(tcp_marker)?;
    let host = multiaddr[ip_start..tcp_start].to_string();
    let port_start = tcp_start + tcp_marker.len();
    let port_end = multiaddr[port_start..]
        .find('/')
        .map(|i| port_start + i)
        .unwrap_or(multiaddr.len());
    let port = multiaddr[port_start..port_end].parse().ok()?;
    Some((host, port))
}

struct DiscoveryCtx {
    app: Arc<AppState>,
}

unsafe extern "C" fn on_mdns_discover(
    user: *mut c_void,
    peer_id: *const c_char,
    multiaddr: *const c_char,
) {
    if user.is_null() || peer_id.is_null() || multiaddr.is_null() {
        return;
    }
    let ctx = unsafe { &*(user as *const DiscoveryCtx) };
    let peer_id = unsafe { CStr::from_ptr(peer_id) }
        .to_string_lossy()
        .to_string();
    let multiaddr = unsafe { CStr::from_ptr(multiaddr) }
        .to_string_lossy()
        .to_string();

    if peer_id.is_empty() || peer_id == ctx.app.identity.peer_id {
        return;
    }
    if ctx
        .app
        .connected_peers()
        .iter()
        .any(|p| p.info.lock().unwrap().remote_pid_full == peer_id)
    {
        return;
    }
    {
        let mut attempted = ctx.app.attempted.lock().unwrap();
        if !attempted.insert(peer_id.clone()) {
            return;
        }
    }
    if ctx.app.identity.peer_id.as_str() > peer_id.as_str() {
        return;
    }

    let Some((host, port)) = parse_multiaddr(&multiaddr) else {
        return;
    };
    ctx.app.netlog(NetLevel::Info, format!("mDNS {multiaddr}"));
    if let Some(fd) = tcp_dial(&host, port) {
        start_peer(ctx.app.clone(), true, fd, format!("{host}:{port}"));
    } else {
        ctx.app
            .netlog(NetLevel::Warn, format!("dial failed {host}:{port}"));
    }
}

pub fn discovery_thread(
    app: Arc<AppState>,
    listen_fd: i32,
    instance_name: String,
) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        let mut mctx = unsafe { boxed_zeroed::<sys::mdns_ctx_t>() };
        if unsafe { sys::mdns_init(mctx.as_mut()) } != 0 {
            app.emit_error("mdns init failed");
            return;
        }

        let lan_ip = app.lan_ip.lock().unwrap().clone();
        let port = app.listen_port.load(Ordering::Relaxed);
        let multiaddr = format!("/ip4/{lan_ip}/tcp/{port}/p2p/{}", app.identity.peer_id);
        let txt = format!("dnsaddr={multiaddr}");
        let mut txt_data = Vec::with_capacity(txt.len() + 1);
        txt_data.push(txt.len().min(255) as u8);
        txt_data.extend_from_slice(txt.as_bytes());

        let instance = cstring(&instance_name);
        let service = cstring(CHAT_SERVICE_TYPE);
        let rc = unsafe {
            sys::mdns_register_service(
                mctx.as_mut(),
                instance.as_ptr(),
                service.as_ptr(),
                port,
                txt_data.as_ptr(),
                txt_data.len(),
            )
        };
        if rc != 0 {
            app.emit_error("mdns register failed");
            unsafe { sys::mdns_free(mctx.as_mut()) };
            return;
        }

        let mut ctx = Box::new(DiscoveryCtx { app: app.clone() });
        unsafe {
            sys::mdns_set_discovery_callback(
                mctx.as_mut(),
                Some(on_mdns_discover),
                ctx.as_mut() as *mut _ as *mut c_void,
            );
        }

        let query = cstring(&format!("{CHAT_SERVICE_TYPE}.local"));
        unsafe {
            sys::mdns_announce(mctx.as_mut());
            sys::mdns_query(mctx.as_mut(), query.as_ptr());
            sys::speer_tcp_set_nonblocking(listen_fd, 1);
        }

        let mut announce_acc = 0;
        while !app.quit.load(Ordering::Relaxed) {
            let mut fd = -1;
            let mut peer_addr = [0 as c_char; 64];
            let rc = unsafe {
                sys::speer_tcp_accept(listen_fd, &mut fd, peer_addr.as_mut_ptr(), peer_addr.len())
            };
            if rc == 0 && fd >= 0 {
                let addr = crate::ffi::c_char_array_to_string(&peer_addr);
                app.netlog(NetLevel::Info, format!("tcp accept {addr}"));
                start_peer(app.clone(), false, fd, addr);
            }

            announce_acc += 100;
            if announce_acc >= 1000 {
                unsafe {
                    sys::mdns_announce(mctx.as_mut());
                    sys::mdns_query(mctx.as_mut(), query.as_ptr());
                }
                announce_acc = 0;
            }
            unsafe {
                sys::mdns_poll(mctx.as_mut(), 100);
            }
        }

        unsafe {
            sys::mdns_unregister_service(mctx.as_mut(), instance.as_ptr());
            sys::mdns_free(mctx.as_mut());
        }
    })
}