squawkbox 0.1.4

DLC contract communication over nostr.
Documentation
mod dlc;
pub mod notification;

use ddk::chain::EsploraClient;
use ddk::error::TransportError;
use ddk::nostr::messages::{create_dlc_msg_event, handle_dlc_msg_event};
use ddk::wallet::DlcDevKitWallet;
use ddk::{Oracle, Storage, Transport};
use ddk_manager::{CachedContractSignerProvider, SimpleSigner, SystemTimeProvider};
use nostr_relay_pool::relay::limits::{RelayEventLimits, RelayMessageLimits};
use nostr_sdk::pool::RelayLimits;
use nostr_sdk::{Client, Options, RelayPoolNotification};
use nostr_sdk::{Keys, Timestamp, Url};
use std::sync::Arc;
use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;

use crate::notification::{SquawkboxNotification, SquawkboxNotificationKind, contract_prefix};

/// DlcDevKit type alias for the [ddk_manager::manager::Manager]
pub type DlcDevKitDlcManager<S, O> = ddk_manager::manager::Manager<
    Arc<DlcDevKitWallet>,
    Arc<CachedContractSignerProvider<Arc<DlcDevKitWallet>, SimpleSigner>>,
    Arc<EsploraClient>,
    Arc<S>,
    Arc<O>,
    Arc<SystemTimeProvider>,
    Arc<DlcDevKitWallet>,
    SimpleSigner,
>;

pub struct Squawkbox {
    pub keys: Keys,
    pub relay_url: Url,
    pub client: Client,
    pub notification_sender: broadcast::Sender<SquawkboxNotification>,
}

impl Squawkbox {
    pub async fn new(
        keys: &Keys,
        relay_host: &str,
        notification_sender: broadcast::Sender<SquawkboxNotification>,
    ) -> anyhow::Result<Squawkbox> {
        let relay_url = relay_host.parse()?;
        let mut event_limits = RelayEventLimits::default();
        event_limits.max_size = Some(5242880);
        let mut message_limits = RelayMessageLimits::default();
        message_limits.max_size = Some(5242880);

        let mut relay_limits = RelayLimits::default();
        relay_limits.events = event_limits;
        relay_limits.messages = message_limits;

        let options = Options::new().relay_limits(relay_limits);

        let client = Client::builder().signer(keys.clone()).opts(options).build();
        client.add_relay(&relay_url).await?;
        client.connect().await;

        Ok(Squawkbox {
            keys: keys.clone(),
            relay_url,
            client,
            notification_sender,
        })
    }

    pub fn start<S: Storage, O: Oracle>(
        &self,
        mut stop_signal: watch::Receiver<bool>,
        manager: Arc<DlcDevKitDlcManager<S, O>>,
    ) -> JoinHandle<Result<(), TransportError>> {
        tracing::info!(
            nostr_pubkey = self.keys.public_key().to_string(),
            transport_public_key = self.public_key().to_string(),
            "Starting Nostr DLC listener."
        );
        let nostr_client = self.client.clone();
        let keys = self.keys.clone();
        let notification_sender = self.notification_sender.clone();
        tokio::spawn(async move {
            let since = Timestamp::now();
            let msg_subscription =
                ddk::nostr::messages::create_dlc_message_filter(since, keys.public_key());
            nostr_client
                .subscribe(msg_subscription, None)
                .await
                .map_err(|e| TransportError::Listen(e.to_string()))?;
            tracing::info!(
                "Listening for messages on {}",
                keys.public_key().to_string()
            );
            let mut notifications = nostr_client.notifications();
            loop {
                tokio::select! {
                    _ = stop_signal.changed() => {
                        if *stop_signal.borrow() {
                            tracing::warn!("Stopping nostr dlc message subscription.");
                            nostr_client.disconnect().await;
                            break;
                        }
                    },
                    Ok(notification) = notifications.recv() => {
                        if let RelayPoolNotification::Event {
                            relay_url: _,
                            subscription_id: _,
                            event,
                        } = notification {
                            let (pubkey, message, event) = match handle_dlc_msg_event(
                                &event,
                                keys.secret_key(),
                            ) {
                                Ok(msg) => {
                                    tracing::info!(pubkey=msg.0.to_string(), "Received DLC nostr message.");

                                    if let Err(e) = notification_sender.send(SquawkboxNotification {
                                        kind: SquawkboxNotificationKind::ReceivedMessage,
                                        counterparty: event.pubkey.to_string(),
                                        contract_id: event.id.to_string(),
                                        state: contract_prefix(&msg.1),
                                        message: None,
                                    }) {
                                        tracing::error!("Error sending notification: {}", e);
                                    }

                                    (msg.0, msg.1, msg.2)
                                },
                                Err(_) => {
                                    tracing::error!("Could not parse event {}", event.id);
                                    continue;
                                }
                            };

                            match manager.on_dlc_message(&message, pubkey).await {
                                Ok(Some(msg)) => {
                                    let event = create_dlc_msg_event(
                                        event.pubkey,
                                        Some(event.id),
                                        msg,
                                        &keys,
                                    )
                                    .expect("no message");
                                    nostr_client
                                        .send_event(&event)
                                        .await
                                        .expect("Break out into functions.");
                                    if let Err(e) = notification_sender.send(SquawkboxNotification {
                                        kind: SquawkboxNotificationKind::SendingMessage,
                                        counterparty: event.pubkey.to_string(),
                                        contract_id: event.id.to_string(),
                                        state: contract_prefix(&message),
                                        message: None,
                                    }) {
                                        tracing::error!("Error sending notification: {}", e);
                                    }
                                }
                                Ok(None) => (),
                                Err(e) => {
                                    if let Err(e) = notification_sender.send(SquawkboxNotification {
                                        kind: SquawkboxNotificationKind::Error,
                                        counterparty: event.pubkey.to_string(),
                                        contract_id: event.id.to_string(),
                                        state: 0,
                                        message: Some(e.to_string()),
                                    }) {
                                        tracing::error!("Error sending notification: {}", e);
                                    }
                                }
                            }
                        }
                    }
                }
            }
            Ok::<_, TransportError>(())
        })
    }
}