use crate::actions::AppAction;
use crate::state::{
AccountSnapshot, AppState, ChatKind, ChatMessageKind, ChatMessageSnapshot, ChatThreadSnapshot,
CurrentChatSnapshot, DeliveryState, DeviceAuthorizationState, DeviceEntrySnapshot,
DeviceRosterSnapshot, GroupDetailsSnapshot, GroupMemberSnapshot, LinkDeviceSnapshot,
MessageAttachmentSnapshot, MessageDeliveryTraceSnapshot, MessageReactionSnapshot,
MessageReactor, MessageRecipientDeliverySnapshot, MobilePushNotificationResolution,
MobilePushSubscriptionRequest, MobilePushSyncSnapshot, NetworkStatusSnapshot,
OutgoingAttachment, PeerProfileDebugSnapshot, PreferencesSnapshot, PublicInviteSnapshot,
RelayConnectionSnapshot, Router, Screen, TypingIndicatorSnapshot,
};
use crate::updates::{AppUpdate, CoreMsg, InternalEvent, RelayPublishDrainResult};
use flume::Sender;
use nostr::{Alphabet, EventBuilder, SingleLetterTag, UnsignedEvent};
use nostr_double_ratchet::{
AuthorizedDevice, DevicePubkey as NdrDevicePubkey, DeviceRoster, GroupIncomingEvent,
GroupManagerSnapshot, GroupPendingFanout, GroupPreparedPublish, GroupPreparedSend,
GroupProtocol, GroupSenderKeyHandleResult, GroupSenderKeyMessage, GroupSnapshot, Invite,
MessageEnvelope, OwnerPubkey as NdrOwnerPubkey, PreparedSend, ProtocolContext, RelayGap,
SessionManager, SessionManagerSnapshot, SessionState, UnixSeconds as NdrUnixSeconds,
};
use nostr_double_ratchet_nostr::{
apply_app_keys_snapshot_with_required_device, is_app_keys_event, AppKeys, DeviceEntry,
NostrGroupManager, APP_KEYS_EVENT_KIND, CHAT_MESSAGE_KIND, CHAT_SETTINGS_KIND,
GROUP_SENDER_KEY_MESSAGE_KIND, INVITE_EVENT_KIND, INVITE_RESPONSE_KIND, MESSAGE_EVENT_KIND,
REACTION_KIND, RECEIPT_KIND, TYPING_KIND,
};
use nostr_double_ratchet_nostr::{
group_sender_key_message_event, invite_response_event, message_event,
parse_group_sender_key_message_event, parse_invite_event, parse_invite_response_event,
parse_message_event,
};
use nostr_double_ratchet_pairwise_codec as pairwise_codec;
use nostr_double_ratchet_runtime::StorageAdapter;
use nostr_sdk::prelude::{
Client, Event, Filter, Keys, Kind, PublicKey, RelayNotification, RelayPoolNotification,
RelayStatus, RelayUrl, SubscribeOptions, SubscriptionId, Timestamp, ToBech32,
};
use rand::rngs::OsRng;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, sleep_until, Duration, Instant};
mod account;
mod attachment_upload;
mod attachments;
mod chat_reactions;
mod chat_receipts;
mod chat_settings;
mod chat_typing;
mod chats;
mod config;
mod groups;
mod identity;
mod invites;
mod lifecycle;
mod message_expiry;
mod mobile_push;
mod model;
mod nearby;
mod payloads;
mod persistence;
mod profile;
mod profile_helpers;
mod projection;
mod protocol;
mod protocol_engine;
mod protocol_filters;
mod publish_helpers;
mod publishing;
mod relay;
mod routing;
mod storage;
mod support;
#[cfg(test)]
mod tests;
pub(crate) const NEARBY_PRESENCE_KIND: u16 = 22242;
pub(super) const APPCORE_PROTOCOL_LABEL: &str = "appcore-protocol";
pub(super) const APPCORE_PROTOCOL_BOOTSTRAP_LABEL: &str = "appcore-protocol-bootstrap";
pub(super) const APPCORE_PROTOCOL_FIRST_CONTACT_LABEL: &str = "appcore-protocol-first-contact";
type OwnerPubkey = PublicKey;
type DevicePubkey = PublicKey;
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub(super) struct UnixSeconds(u64);
impl UnixSeconds {
pub(super) fn get(self) -> u64 {
self.0
}
}
use account::known_app_keys_from_ndr;
use account::known_app_keys_to_ndr;
use attachment_upload::upload_profile_picture_to_hashtree;
use attachments::*;
use config::*;
pub(crate) use config::{
app_version_string, build_summary, configured_relays, relay_set_id, trusted_test_build_flag,
};
use identity::*;
pub(crate) use identity::{normalize_peer_input_for_display, parse_peer_input};
pub(crate) use mobile_push::{
build_mobile_push_create_subscription_request, build_mobile_push_delete_subscription_request,
build_mobile_push_list_subscriptions_request, build_mobile_push_update_subscription_request,
decrypt_mobile_push_notification, mobile_push_stored_subscription_id_key,
resolve_mobile_push_notification, resolve_mobile_push_server_url,
};
pub(crate) use model::ProtocolSubscriptionPlan;
use model::*;
use payloads::*;
use profile_helpers::*;
use protocol_engine::*;
use protocol_filters::*;
use publish_helpers::*;
use storage::{open_database, AppStore, DataDirLock, SqliteStorageAdapter};
pub(crate) use storage::{search_messages_fts, PersistedMessageSearchHit, SharedConnection};
pub(crate) fn chat_snapshot_from_state_and_db(
state: &AppState,
shared_db: Option<&SharedConnection>,
chat_id: &str,
limit: usize,
) -> Option<CurrentChatSnapshot> {
let chat_id = chat_id.trim();
if chat_id.is_empty() || state.account.is_none() {
return None;
}
if let Some(current) = state
.current_chat
.as_ref()
.filter(|chat| chat.chat_id == chat_id)
{
return Some(current.clone());
}
build_chat_snapshot_with_messages(
state,
shared_db,
chat_id,
ChatPageRequest::Latest {
limit: limit.max(1),
},
)
}
pub(crate) fn chat_snapshot_before_from_state_and_db(
state: &AppState,
shared_db: Option<&SharedConnection>,
chat_id: &str,
before_message_id: &str,
limit: usize,
) -> Option<CurrentChatSnapshot> {
build_chat_snapshot_with_messages(
state,
shared_db,
chat_id,
ChatPageRequest::Before {
before_message_id,
limit: limit.max(1),
},
)
}
pub(crate) fn chat_snapshot_around_message_from_state_and_db(
state: &AppState,
shared_db: Option<&SharedConnection>,
chat_id: &str,
message_id: &str,
before_limit: usize,
after_limit: usize,
) -> Option<CurrentChatSnapshot> {
build_chat_snapshot_with_messages(
state,
shared_db,
chat_id,
ChatPageRequest::Around {
message_id,
before_limit,
after_limit,
},
)
}
enum ChatPageRequest<'a> {
Latest {
limit: usize,
},
Before {
before_message_id: &'a str,
limit: usize,
},
Around {
message_id: &'a str,
before_limit: usize,
after_limit: usize,
},
}
fn build_chat_snapshot_with_messages(
state: &AppState,
shared_db: Option<&SharedConnection>,
chat_id: &str,
request: ChatPageRequest<'_>,
) -> Option<CurrentChatSnapshot> {
let chat_id = chat_id.trim();
if chat_id.is_empty() || state.account.is_none() {
return None;
}
let thread = state.chat_list.iter().find(|chat| chat.chat_id == chat_id);
let messages = load_chat_messages(shared_db, chat_id, request)?;
let group_id = group_id_from_chat_id(chat_id);
Some(CurrentChatSnapshot {
chat_id: chat_id.to_string(),
kind: thread
.map(|thread| thread.kind.clone())
.unwrap_or_else(|| chat_kind_for_id(chat_id)),
display_name: thread
.map(|thread| thread.display_name.clone())
.unwrap_or_else(|| fallback_chat_title(chat_id)),
subtitle: thread
.and_then(|thread| thread.subtitle.clone())
.or_else(|| group_id.as_ref().map(|_| "Group".to_string())),
picture_url: thread.and_then(|thread| thread.picture_url.clone()),
group_id,
member_count: thread.map(|thread| thread.member_count).unwrap_or(0),
message_ttl_seconds: None,
is_muted: thread.map(|thread| thread.is_muted).unwrap_or(false),
messages,
typing_indicators: Vec::new(),
draft: thread
.map(|thread| thread.draft.clone())
.unwrap_or_default(),
is_request: thread.map(|thread| thread.is_request).unwrap_or(false),
})
}
fn load_chat_messages(
shared_db: Option<&SharedConnection>,
chat_id: &str,
request: ChatPageRequest<'_>,
) -> Option<Vec<ChatMessageSnapshot>> {
let Some(shared) = shared_db else {
return match request {
ChatPageRequest::Latest { .. } => Some(Vec::new()),
ChatPageRequest::Before { .. } | ChatPageRequest::Around { .. } => None,
};
};
let Ok(conn) = shared.try_lock() else {
return match request {
ChatPageRequest::Latest { .. } => Some(Vec::new()),
ChatPageRequest::Before { .. } | ChatPageRequest::Around { .. } => None,
};
};
let result = match request {
ChatPageRequest::Latest { limit } => storage::load_recent_messages(&conn, chat_id, limit),
ChatPageRequest::Before {
before_message_id,
limit,
} => storage::load_messages_before(&conn, chat_id, before_message_id, limit),
ChatPageRequest::Around {
message_id,
before_limit,
after_limit,
} => storage::load_messages_around(&conn, chat_id, message_id, before_limit, after_limit),
};
let Ok(messages) = result else {
return match request {
ChatPageRequest::Latest { .. } => Some(Vec::new()),
ChatPageRequest::Before { .. } | ChatPageRequest::Around { .. } => None,
};
};
Some(
messages
.iter()
.map(chats::chat_message_from_persisted)
.collect(),
)
}
fn group_id_from_chat_id(chat_id: &str) -> Option<String> {
chat_id
.strip_prefix("group:")
.filter(|group_id| !group_id.trim().is_empty())
.map(ToString::to_string)
}
fn fallback_chat_title(chat_id: &str) -> String {
if is_group_chat_id(chat_id) {
return "Group".to_string();
}
let trimmed = chat_id.trim();
let boundary = trimmed
.char_indices()
.map(|(index, _)| index)
.nth(12)
.unwrap_or(trimmed.len());
if boundary < trimmed.len() {
format!("{}...", &trimmed[..boundary])
} else {
trimmed.to_string()
}
}
pub struct AppCore {
update_tx: Sender<AppUpdate>,
core_sender: Sender<CoreMsg>,
shared_state: Arc<RwLock<AppState>>,
runtime: tokio::runtime::Runtime,
data_dir: PathBuf,
state: AppState,
logged_in: Option<LoggedInState>,
protocol_engine: Option<ProtocolEngine>,
pending_linked_device: Option<PendingLinkedDeviceState>,
private_chat_invites: BTreeMap<String, Invite>,
threads: BTreeMap<String, ThreadRecord>,
active_chat_id: Option<String>,
screen_stack: Vec<Screen>,
next_message_id: u64,
owner_profiles: BTreeMap<String, OwnerProfileRecord>,
app_keys: BTreeMap<String, KnownAppKeys>,
groups: BTreeMap<String, GroupSnapshot>,
typing_indicators: BTreeMap<String, TypingIndicatorRecord>,
typing_floor_secs: BTreeMap<String, u64>,
chat_message_ttl_seconds: BTreeMap<String, u64>,
preferences: PreferencesSnapshot,
recent_handshake_peers: BTreeMap<String, RecentHandshakePeer>,
seen_event_ids: HashSet<String>,
seen_event_order: VecDeque<String>,
device_invite_poll_token: u64,
message_expiry_token: u64,
protocol_reconnect_token: u64,
defer_owner_app_keys_publish: bool,
protocol_subscription_runtime: ProtocolSubscriptionRuntime,
relay_transport_runtime: RelayTransportRuntime,
relay_status_watch_urls: HashSet<String>,
relay_status_watch_generation: u64,
relay_status_by_url: BTreeMap<String, RelayStatus>,
relay_connected_count: u64,
all_relays_offline_since_secs: Option<u64>,
pending_relay_publishes: BTreeMap<String, PendingRelayPublish>,
pending_relay_publish_inflight: HashSet<String>,
pending_decrypted_delivery_acks: HashSet<String>,
event_transport_channels: BTreeMap<String, String>,
pending_mobile_push_events: VecDeque<Event>,
debug_log: VecDeque<DebugLogEntry>,
debug_event_counters: DebugEventCounters,
debug_snapshot_write_generation: u64,
debug_snapshot_write_inflight: bool,
debug_snapshot_write_dirty: bool,
debug_snapshot_last_built_at_ms: u64,
debug_snapshot_build_count: u64,
batch_depth: u32,
batch_dirty_state: bool,
batch_dirty_persist: bool,
pending_outgoing_receipts: BTreeMap<(String, String), Vec<String>>,
last_emitted_state: Option<AppState>,
app_store: AppStore,
_data_dir_lock: DataDirLock,
cached_mobile_push: MobilePushSyncSnapshot,
mobile_push_dirty: bool,
suspended: bool,
}
async fn connect_client_with_timeout(client: &Client, timeout: Duration) {
client.connect().await;
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if connected_relay_count_for_client(client).await > 0 {
return;
}
sleep(Duration::from_millis(250)).await;
}
}
async fn connected_relay_count_for_client(client: &Client) -> usize {
client
.relays()
.await
.values()
.filter(|relay| relay.status() == RelayStatus::Connected)
.count()
}