mod components;
mod database;
pub mod prelude;
mod upload;
use database::{fetch_events_from_db, on_db_fetch};
use upload::{FileUploadReqEvent, FileUploadSuccess, upload_file};
use ::url::Url;
use async_channel::{Receiver, Sender, bounded};
use bevy::prelude::*;
use bevy_async_task::TaskRunner;
use nostr::Event as NEvent;
use nostr_sdk::prelude::*;
use std::{path::PathBuf, sync::Arc, task::Poll, time::Duration};
struct EvNotification {
event: NEvent,
sub_id: SubscriptionId,
}
#[derive(Resource)]
struct NostrState {
event_tx: Sender<NEvent>,
#[allow(dead_code)]
event_rx: Receiver<NEvent>,
ev_notif_tx: Sender<EvNotification>,
ev_notif_rx: Receiver<EvNotification>,
file_upload_tx: Sender<FileUploadSuccess>,
file_upload_rx: Receiver<FileUploadSuccess>,
started: bool,
}
#[derive(Resource)]
struct NostrConfig {
db_path: PathBuf,
web_db_name: String,
}
#[derive(Resource)]
pub struct NostrClientConfig {
pub read_relays: Vec<String>,
pub write_relays: Vec<String>,
}
#[derive(Resource)]
pub struct NostrClientConfigured;
#[derive(Resource)]
pub struct NostrClient {
pub client: Arc<Client>,
}
#[derive(Message)]
pub struct NostrEventNotification {
pub event: NEvent,
pub sub_id: SubscriptionId,
}
#[derive(Event)]
pub struct FilterSubscribe {
filter: Filter,
id: Option<SubscriptionId>,
name: Option<String>,
}
#[derive(Component, Debug)]
pub struct FilterSubscription {
pub id: SubscriptionId,
pub filter: Filter,
}
impl FilterSubscription {
pub fn new(filter: Filter) -> Self {
let id = SubscriptionId::generate();
Self { id, filter }
}
}
#[derive(Component, Debug)]
pub struct FilterSubscriptionName(pub String);
#[derive(Event)]
pub struct FetchEvents {
filter: Filter,
}
impl FetchEvents {
pub fn new(filter: Filter) -> Self {
Self { filter }
}
}
#[derive(Event)]
pub struct SendEventBuilder {
pub builder: EventBuilder,
}
impl FilterSubscribe {
pub fn new(filter: Filter) -> Self {
Self {
filter,
name: None,
id: None,
}
}
pub fn with_name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
pub fn with_id(mut self, id: SubscriptionId) -> Self {
self.id = Some(id);
self
}
}
impl std::default::Default for NostrState {
fn default() -> Self {
let (event_tx, event_rx) = bounded(256);
let (ev_notif_tx, ev_notif_rx) = bounded(256);
let (file_upload_tx, file_upload_rx) = bounded(16);
Self {
event_tx,
event_rx,
ev_notif_tx,
ev_notif_rx,
file_upload_tx,
file_upload_rx,
started: false,
}
}
}
#[derive(Resource, Default)]
pub struct NostrPlugin {
pub db_path: Option<PathBuf>,
pub web_db_name: Option<String>,
}
static DEFAULT_RELAYS: [&str; 7] = [
"wss://nostr.mom/",
"wss://relay.primal.net/",
"wss://relay.nos.social/",
"wss://nos.lol/",
"wss://relay.damus.io/",
"wss://bitcoiner.social/",
"wss://offchain.pub/",
];
impl std::default::Default for NostrClientConfig {
fn default() -> Self {
Self {
write_relays: DEFAULT_RELAYS
.into_iter()
.map(|s| s.to_string())
.collect(),
read_relays: DEFAULT_RELAYS
.into_iter()
.map(|s| s.to_string())
.collect(),
}
}
}
impl NostrClientConfig {
pub fn custom(relays: Vec<&str>) -> Self {
Self {
read_relays: relays
.clone()
.into_iter()
.map(|s| s.to_string())
.collect(),
write_relays: relays
.clone()
.into_iter()
.map(|s| s.to_string())
.collect(),
}
}
}
impl Plugin for NostrPlugin {
fn build(&self, app: &mut App) {
let config = NostrConfig {
db_path: self
.db_path
.clone()
.unwrap_or(PathBuf::from("bevy_nostr")),
web_db_name: self
.web_db_name
.clone()
.unwrap_or(String::from("bevy_nostr")),
};
app.init_resource::<NostrState>()
.insert_resource(config)
.add_message::<NostrEventNotification>()
.add_message::<FileUploadSuccess>()
.add_observer(on_filter_subscribe)
.add_observer(on_file_upload_req)
.add_observer(on_send_event_builder)
.add_observer(on_fetch_events)
.add_systems(
Update,
setup_nostr_client.run_if(not(resource_exists::<NostrClient>)),
)
.add_systems(
Update,
setup_nostr_relays.run_if(
resource_exists::<NostrClientConfig>
.and(resource_exists::<NostrClient>.and(not(
resource_exists::<NostrClientConfigured>,
))),
),
)
.add_systems(
Update,
fetch_events_from_db.run_if(resource_exists::<NostrClient>),
)
.add_observer(on_db_fetch)
.add_systems(
Update,
setup_nostr_notifications.run_if(resource_added::<NostrClient>),
)
.add_systems(
Update,
read_nostr_notifications.run_if(resource_exists::<NostrClient>),
)
.add_systems(
Update,
read_file_upload_events.run_if(resource_exists::<NostrClient>),
);
}
}
fn setup_nostr_client(
mut task_runner: TaskRunner<'_, Client>,
mut nostr_state: ResMut<NostrState>,
config: Res<NostrConfig>,
mut commands: Commands,
) {
let db_path = config.db_path.clone();
#[allow(unused_variables)]
let db_name = config.web_db_name.clone();
if task_runner.is_idle() && !nostr_state.started {
task_runner.start(async move {
#[cfg(not(target_arch = "wasm32"))]
let database = NostrLMDB::open(db_path).unwrap();
#[cfg(target_arch = "wasm32")]
let database = WebDatabase::open(db_name).await.unwrap();
Client::builder().database(database).build()
});
nostr_state.started = true;
}
match task_runner.poll() {
Poll::Ready(client) => {
commands.insert_resource(NostrClient {
client: Arc::new(client),
});
}
Poll::Pending => {}
}
}
fn setup_nostr_relays(
mut task_runner: TaskRunner<'_, ()>,
nostr_client: ResMut<NostrClient>,
config: Res<NostrClientConfig>,
mut commands: Commands,
) {
let client = nostr_client.client.clone();
let read_relays = config.read_relays.clone();
let write_relays = config.write_relays.clone();
if task_runner.is_idle() {
task_runner.start(async move {
client.force_remove_all_relays().await;
for relay in read_relays {
if let Err(_) = client.add_read_relay(&relay).await {
warn!("Failed to add relay {relay}");
}
}
for relay in write_relays {
if let Err(_) = client.add_write_relay(&relay).await {
warn!("Failed to add relay {relay}");
}
}
client.connect().await;
});
}
match task_runner.poll() {
Poll::Ready(_) => commands.insert_resource(NostrClientConfigured),
Poll::Pending => {}
}
}
fn setup_nostr_notifications(
mut task_runner: TaskRunner<'_, ()>,
nostr_state: ResMut<NostrState>,
nostr_client: ResMut<NostrClient>,
) {
let client = nostr_client.client.clone();
let tx = nostr_state.ev_notif_tx.clone();
task_runner.start(async move {
let _ = client
.handle_notifications(|notification| async {
if let RelayPoolNotification::Event {
subscription_id,
event,
..
} = notification
{
let _ = tx.try_send(EvNotification {
event: *event,
sub_id: subscription_id,
});
} else if let RelayPoolNotification::Message {
relay_url: _,
message,
..
} = notification
{
if let RelayMessage::Event {
subscription_id,
event,
} = message
{
let _ = tx.try_send(EvNotification {
event: event.into_owned(),
sub_id: subscription_id.into_owned(),
});
}
}
Ok(false)
})
.await;
});
}
fn read_nostr_notifications(
nostr_state: ResMut<NostrState>,
mut msg_writer: MessageWriter<NostrEventNotification>,
) {
while let Ok(pkg) = nostr_state.ev_notif_rx.try_recv() {
msg_writer.write(NostrEventNotification {
event: pkg.event,
sub_id: pkg.sub_id,
});
}
}
fn on_filter_subscribe(
event: On<FilterSubscribe>,
mut commands: Commands,
mut task_runner: TaskRunner<'_, ()>,
nostr_client: ResMut<NostrClient>,
) {
let client = nostr_client.client.clone();
let filter = event.filter.clone();
let sub_id = SubscriptionId::generate();
let id = commands
.spawn(FilterSubscription {
id: sub_id.clone(),
filter: filter.clone(),
})
.id();
if let Some(name) = &event.name {
commands
.entity(id)
.insert(FilterSubscriptionName(name.clone()));
}
task_runner.start(async move {
if let Err(err) = client.subscribe_with_id(sub_id, filter, None).await {
warn!("Error subscribing to filter: {err}");
}
});
}
fn on_send_event_builder(
event: On<SendEventBuilder>,
mut task_runner: TaskRunner<'_, ()>,
nostr_client: ResMut<NostrClient>,
) {
let client = nostr_client.client.clone();
let builder = event.builder.clone();
task_runner.start(async move {
if let Err(err) = client.send_event_builder(builder).await {
warn!("Error sending event: {err}");
}
});
}
fn on_fetch_events(
event: On<FetchEvents>,
nostr_state: ResMut<NostrState>,
mut task_runner: TaskRunner<'_, ()>,
nostr_client: ResMut<NostrClient>,
) {
let client = nostr_client.client.clone();
let filter = event.filter.clone();
let tx = nostr_state.event_tx.clone();
task_runner.start(async move {
if let Ok(events) =
client.fetch_events(filter, Duration::from_secs(60)).await
{
for event in events {
let _ = tx.try_send(event);
}
}
});
}
fn on_file_upload_req(
event: On<FileUploadReqEvent>,
mut task_runner: TaskRunner<'_, Option<Url>>,
nostr_state: ResMut<NostrState>,
nostr_client: ResMut<NostrClient>,
) {
let client = nostr_client.client.clone();
let file_data = event.file_data.clone();
let tx = nostr_state.file_upload_tx.clone();
let upload_id = event.id.clone();
task_runner.start(async move {
let Ok(signer) = client.signer().await else {
return None;
};
match upload_file(file_data, None, signer).await {
Ok(url) => {
if let Err(_) =
tx.try_send(FileUploadSuccess { url, id: upload_id })
{
warn!("Channel send error");
}
None
}
Err(e) => {
warn!("Error uploading file to nip96 server: {e:?}");
None
}
}
});
}
fn read_file_upload_events(
nostr_state: ResMut<NostrState>,
mut msg_writer: MessageWriter<FileUploadSuccess>,
) {
if let Ok(event) = nostr_state.file_upload_rx.try_recv() {
msg_writer.write(event);
}
}
pub fn change_nostr_signer(
mut task_runner: TaskRunner<'_, ()>,
nostr_client: ResMut<NostrClient>,
signer: Arc<dyn NostrSigner>,
) {
let client = nostr_client.client.clone();
task_runner.start(async move {
client.set_signer(signer).await;
});
}