use super::*;
const OPEN_CHAT_MESSAGES_PER_PAGE: usize = 80;
impl AppCore {
pub(super) fn create_chat(&mut self, peer_input: &str) {
if self.logged_in.is_none() {
self.state.toast = Some("Create or restore a profile first.".to_string());
self.emit_state();
return;
}
if !self.can_use_chats() {
self.state.toast = Some(chat_unavailable_message(self.logged_in.as_ref()).to_string());
self.emit_state();
return;
}
self.state.busy.creating_chat = true;
self.emit_state();
match self.open_direct_chat_from_peer_input(peer_input) {
Ok(chat_id) => {
self.push_debug_log(
"chat.create",
format!("peer_input={} chat_id={chat_id}", peer_input.trim()),
);
}
Err(_) => self.state.toast = Some("Invalid peer key.".to_string()),
}
self.rebuild_state();
self.persist_best_effort();
self.state.busy.creating_chat = false;
self.emit_state();
}
pub(super) fn open_direct_chat_from_peer_input(
&mut self,
peer_input: &str,
) -> anyhow::Result<String> {
let (chat_id, peer_pubkey) = parse_peer_input(peer_input)?;
let now = unix_now().get();
self.prune_expired_messages(now);
self.ensure_thread_record(&chat_id, now).unread_count = 0;
self.load_latest_message_page_for_chat(&chat_id);
if let Some(logged_in) = self.logged_in.as_ref() {
logged_in.ndr_runtime.setup_user(peer_pubkey)?;
}
self.process_runtime_events();
self.active_chat_id = Some(chat_id.clone());
self.screen_stack = vec![Screen::Chat {
chat_id: chat_id.clone(),
}];
self.republish_local_identity_artifacts();
self.request_protocol_subscription_refresh();
self.fetch_recent_protocol_state();
self.schedule_tracked_peer_catch_up(Duration::from_secs(RESUBSCRIBE_CATCH_UP_DELAY_SECS));
Ok(chat_id)
}
pub(super) fn ensure_thread_record(
&mut self,
chat_id: &str,
updated_at_secs: u64,
) -> &mut ThreadRecord {
let thread = self
.threads
.entry(chat_id.to_string())
.or_insert_with(|| ThreadRecord {
chat_id: chat_id.to_string(),
unread_count: 0,
updated_at_secs,
messages: Vec::new(),
});
if thread.updated_at_secs == 0 {
thread.updated_at_secs = updated_at_secs;
}
thread
}
pub(super) fn migrate_verified_device_owner_threads(
&mut self,
owner: PublicKey,
app_keys: &AppKeys,
) {
let owner_hex = owner.to_hex();
for device in app_keys.get_all_devices() {
let device_hex = device.identity_pubkey.to_hex();
if device_hex != owner_hex {
self.migrate_direct_thread_alias(&device_hex, &owner_hex);
}
}
}
fn migrate_direct_thread_alias(&mut self, from_chat_id: &str, to_chat_id: &str) {
if from_chat_id == to_chat_id || !self.threads.contains_key(from_chat_id) {
return;
}
let from_label = self.owner_display_label(from_chat_id);
let to_label = self.owner_display_label(to_chat_id);
let Some(mut source) = self.threads.remove(from_chat_id) else {
return;
};
let target = self
.threads
.entry(to_chat_id.to_string())
.or_insert_with(|| ThreadRecord {
chat_id: to_chat_id.to_string(),
unread_count: 0,
updated_at_secs: source.updated_at_secs,
messages: Vec::new(),
});
for mut message in source.messages.drain(..) {
let duplicate = target.messages.iter().any(|existing| {
existing.id == message.id
|| message
.source_event_id
.as_ref()
.is_some_and(|source_event_id| {
existing.source_event_id.as_ref() == Some(source_event_id)
})
});
if duplicate {
continue;
}
message.chat_id = to_chat_id.to_string();
if !message.is_outgoing
&& (message.author == from_chat_id || message.author == from_label)
{
message.author = to_label.clone();
}
for delivery in &mut message.recipient_deliveries {
if delivery.owner_pubkey_hex == from_chat_id {
delivery.owner_pubkey_hex = to_chat_id.to_string();
}
}
for reactor in &mut message.reactors {
if reactor.author == from_chat_id {
reactor.author = to_chat_id.to_string();
}
}
target.insert_message_sorted(message);
}
target.unread_count = target.unread_count.saturating_add(source.unread_count);
target.updated_at_secs = target.updated_at_secs.max(source.updated_at_secs);
if self.active_chat_id.as_deref() == Some(from_chat_id) {
self.active_chat_id = Some(to_chat_id.to_string());
}
for screen in &mut self.screen_stack {
if let Screen::Chat { chat_id } = screen {
if chat_id == from_chat_id {
*chat_id = to_chat_id.to_string();
}
}
}
if let Some(ttl) = self.chat_message_ttl_seconds.remove(from_chat_id) {
self.chat_message_ttl_seconds
.entry(to_chat_id.to_string())
.or_insert(ttl);
}
for muted in &mut self.preferences.muted_chat_ids {
if muted == from_chat_id {
*muted = to_chat_id.to_string();
}
}
self.preferences.muted_chat_ids.sort();
self.preferences.muted_chat_ids.dedup();
if let Some(floor) = self.typing_floor_secs.remove(from_chat_id) {
self.typing_floor_secs
.entry(to_chat_id.to_string())
.and_modify(|existing| *existing = (*existing).max(floor))
.or_insert(floor);
}
for peer in self.recent_handshake_peers.values_mut() {
if peer.owner_hex == from_chat_id {
peer.owner_hex = to_chat_id.to_string();
}
}
let typing_indicators = std::mem::take(&mut self.typing_indicators);
for (_, mut indicator) in typing_indicators {
if indicator.chat_id == from_chat_id {
indicator.chat_id = to_chat_id.to_string();
}
if indicator.author_owner_hex == from_chat_id {
indicator.author_owner_hex = to_chat_id.to_string();
}
self.typing_indicators.insert(
format!("{}\n{}", indicator.chat_id, indicator.author_owner_hex),
indicator,
);
}
self.mark_mobile_push_dirty();
}
pub(super) fn find_message_chat_id(&self, message_id: &str) -> Option<String> {
self.threads
.iter()
.find(|(_, thread)| {
thread
.messages
.iter()
.any(|message| message.id == message_id)
})
.map(|(chat_id, _)| chat_id.clone())
}
pub(super) fn normalize_chat_id(&self, chat_id: &str) -> Option<String> {
if is_group_chat_id(chat_id) {
let group_id = parse_group_id_from_chat_id(chat_id)?;
let group_chat_id = group_chat_id(&group_id);
if self.groups.contains_key(&group_id) || self.threads.contains_key(&group_chat_id) {
return Some(group_chat_id);
}
return None;
}
parse_peer_input(chat_id)
.ok()
.map(|(normalized, _)| normalized)
}
pub(super) fn open_chat(&mut self, chat_id: &str) {
if !self.can_use_chats() {
self.state.toast = Some(chat_unavailable_message(self.logged_in.as_ref()).to_string());
self.emit_state();
return;
}
let Some(chat_id) = self.normalize_chat_id(chat_id) else {
self.state.toast = Some("Invalid chat id.".to_string());
self.emit_state();
return;
};
let now = unix_now().get();
self.prune_expired_messages(now);
self.ensure_thread_record(&chat_id, now).unread_count = 0;
self.load_latest_message_page_for_chat(&chat_id);
self.active_chat_id = Some(chat_id.clone());
self.screen_stack = vec![Screen::Chat {
chat_id: chat_id.clone(),
}];
self.republish_local_identity_artifacts();
self.rebuild_state();
self.persist_best_effort();
self.request_protocol_subscription_refresh();
self.fetch_recent_protocol_state();
self.schedule_tracked_peer_catch_up(Duration::from_secs(RESUBSCRIBE_CATCH_UP_DELAY_SECS));
self.emit_state();
}
fn load_latest_message_page_for_chat(&mut self, chat_id: &str) {
let messages = match self
.app_store
.load_recent_messages(chat_id, OPEN_CHAT_MESSAGES_PER_PAGE)
{
Ok(messages) => messages,
Err(error) => {
self.push_debug_log(
"storage.messages.page.error",
format!("chat_id={chat_id} error={error}"),
);
return;
}
};
if messages.is_empty() {
return;
}
let Some(thread) = self.threads.get_mut(chat_id) else {
return;
};
let mut page = messages
.iter()
.map(chat_message_from_persisted)
.collect::<Vec<_>>();
let mut seen = page
.iter()
.map(|message| message.id.clone())
.collect::<HashSet<_>>();
for message in std::mem::take(&mut thread.messages) {
if seen.insert(message.id.clone()) {
page.push(message);
}
}
page.sort_by(|left, right| message_order(left).cmp(&message_order(right)));
thread.messages = page;
}
pub(super) fn send_message(&mut self, chat_id: &str, text: &str, expires_at_secs: Option<u64>) {
let trimmed = text.trim();
if trimmed.is_empty() {
return;
}
if self.logged_in.is_none() {
self.state.toast = Some("Create or restore a profile first.".to_string());
self.emit_state();
return;
}
if !self.can_use_chats() {
self.state.toast = Some(chat_unavailable_message(self.logged_in.as_ref()).to_string());
self.emit_state();
return;
}
let Some(normalized_chat_id) = self.normalize_chat_id(chat_id) else {
self.state.toast = Some("Invalid chat id.".to_string());
self.emit_state();
return;
};
let now = unix_now();
self.active_chat_id = Some(normalized_chat_id.clone());
self.screen_stack = vec![Screen::Chat {
chat_id: normalized_chat_id.clone(),
}];
self.ensure_thread_record(&normalized_chat_id, now.get());
let expires_at_secs = expires_at_secs.or_else(|| {
self.chat_message_ttl_seconds
.get(&normalized_chat_id)
.map(|ttl_seconds| now.get().saturating_add(*ttl_seconds))
});
self.state.busy.sending_message = true;
self.rebuild_state();
self.emit_state();
if is_group_chat_id(&normalized_chat_id) {
self.send_group_message(&normalized_chat_id, trimmed, now, expires_at_secs);
} else {
self.send_direct_message(&normalized_chat_id, trimmed, now, expires_at_secs);
}
self.state.busy.sending_message = false;
self.rebuild_state();
self.persist_best_effort();
self.emit_state();
}
pub(super) fn send_direct_message(
&mut self,
chat_id: &str,
text: &str,
now: UnixSeconds,
expires_at_secs: Option<u64>,
) {
let Ok((normalized_chat_id, peer_pubkey)) = parse_peer_input(chat_id) else {
self.state.toast = Some("Invalid peer key.".to_string());
return;
};
if let Some(logged_in) = self.logged_in.as_ref() {
if let Err(error) = logged_in.ndr_runtime.setup_user(peer_pubkey) {
self.state.toast = Some(error.to_string());
return;
}
}
self.process_runtime_events();
let Some(logged_in) = self.logged_in.as_ref() else {
return;
};
let result = logged_in.ndr_runtime.send_text_with_inner_id(
peer_pubkey,
text.to_string(),
send_options_for_expiration(expires_at_secs),
);
match result {
Ok((inner_id, event_ids)) => {
let message_id = if inner_id.is_empty() {
self.allocate_message_id()
} else {
inner_id
};
let delivery = if event_ids.is_empty() {
DeliveryState::Queued
} else {
DeliveryState::Pending
};
self.push_debug_log(
"message.direct.send",
format!(
"chat_id={normalized_chat_id} message_id={message_id} event_ids={}",
event_ids.len()
),
);
self.push_outgoing_message_with_id(
message_id.clone(),
&normalized_chat_id,
text.to_string(),
now.get(),
expires_at_secs,
delivery,
);
let completions = event_ids
.iter()
.map(|event_id| {
(
event_id.clone(),
(message_id.clone(), normalized_chat_id.clone()),
)
})
.collect::<BTreeMap<_, _>>();
self.process_runtime_events_with_completions(&completions);
self.sync_message_delivery_trace(&normalized_chat_id, &message_id);
if event_ids.is_empty() {
self.request_protocol_subscription_refresh();
if self.fetch_recent_protocol_state() {
self.state.busy.syncing_network = true;
}
}
}
Err(error) => {
self.state.toast = Some(error.to_string());
}
}
}
pub(super) fn send_group_message(
&mut self,
chat_id: &str,
text: &str,
now: UnixSeconds,
expires_at_secs: Option<u64>,
) {
let Some(group_id) = parse_group_id_from_chat_id(chat_id) else {
self.state.toast = Some("Invalid group id.".to_string());
return;
};
let payload = match encode_app_group_message_payload(text) {
Ok(payload) => payload,
Err(error) => {
self.state.toast = Some(error.to_string());
return;
}
};
let message_id = self.allocate_message_id();
let result = self.logged_in.as_ref().map(|logged_in| {
logged_in
.ndr_runtime
.send_group_message(&group_id, payload, Some(message_id.clone()))
});
match result {
Some(Ok(event_ids)) => {
self.push_outgoing_message_with_id(
message_id.clone(),
chat_id,
text.to_string(),
now.get(),
expires_at_secs,
DeliveryState::Pending,
);
let completions = event_ids
.iter()
.map(|event_id| (event_id.clone(), (message_id.clone(), chat_id.to_string())))
.collect::<BTreeMap<_, _>>();
self.process_runtime_events_with_completions(&completions);
self.sync_message_delivery_trace(chat_id, &message_id);
}
Some(Err(error)) => self.state.toast = Some(error.to_string()),
None => {}
}
}
pub(super) fn update_message_delivery(
&mut self,
chat_id: &str,
message_id: &str,
delivery: DeliveryState,
) {
let Some(thread) = self.threads.get_mut(chat_id) else {
return;
};
if let Some(message) = thread
.messages
.iter_mut()
.find(|message| message.id == message_id)
{
message.delivery = delivery.clone();
if matches!(delivery, DeliveryState::Sent) {
for recipient in &mut message.recipient_deliveries {
if matches!(
recipient.delivery,
DeliveryState::Pending | DeliveryState::Queued
) {
recipient.delivery = DeliveryState::Sent;
recipient.updated_at_secs = unix_now().get();
}
}
}
}
}
pub(super) fn record_message_outer_event(
&mut self,
chat_id: &str,
message_id: &str,
event_id: &str,
target_device_id: Option<&str>,
) {
let Some(thread) = self.threads.get_mut(chat_id) else {
return;
};
let Some(message) = thread
.messages
.iter_mut()
.find(|message| message.id == message_id)
else {
return;
};
push_unique(&mut message.delivery_trace.outer_event_ids, event_id);
push_unique(
&mut message.delivery_trace.pending_relay_event_ids,
event_id,
);
if let Some(target_device_id) = target_device_id {
push_unique(
&mut message.delivery_trace.target_device_ids,
target_device_id,
);
}
}
pub(super) fn add_message_transport_channel(
&mut self,
chat_id: &str,
message_id: &str,
channel: &str,
) {
let Some(thread) = self.threads.get_mut(chat_id) else {
return;
};
let Some(message) = thread
.messages
.iter_mut()
.find(|message| message.id == message_id)
else {
return;
};
push_unique(&mut message.delivery_trace.transport_channels, channel);
}
pub(super) fn add_transport_channel_for_event_id(&mut self, event_id: &str, channel: &str) {
for thread in self.threads.values_mut() {
for message in &mut thread.messages {
let matches_source = message.source_event_id.as_deref() == Some(event_id);
let matches_outer = message
.delivery_trace
.outer_event_ids
.iter()
.any(|outer_event_id| outer_event_id == event_id);
if matches_source || matches_outer {
push_unique(&mut message.delivery_trace.transport_channels, channel);
}
}
}
}
pub(super) fn sync_message_delivery_trace(&mut self, chat_id: &str, message_id: &str) {
let pending_relay_event_ids = self
.pending_relay_publishes
.values()
.filter(|pending| {
pending.chat_id.as_deref() == Some(chat_id)
&& pending.message_id.as_deref() == Some(message_id)
})
.map(|pending| pending.event_id.clone())
.collect::<Vec<_>>();
let last_transport_error = self
.pending_relay_publishes
.values()
.filter(|pending| {
pending.chat_id.as_deref() == Some(chat_id)
&& pending.message_id.as_deref() == Some(message_id)
})
.filter_map(|pending| pending.last_error.clone())
.last();
let queued_protocol_targets = self
.logged_in
.as_ref()
.and_then(|logged_in| {
logged_in
.ndr_runtime
.queued_message_diagnostics(Some(message_id))
.ok()
})
.map(|entries| {
entries
.into_iter()
.map(|entry| entry.target_key)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let Some(thread) = self.threads.get_mut(chat_id) else {
return;
};
let Some(message) = thread
.messages
.iter_mut()
.find(|message| message.id == message_id)
else {
return;
};
message.delivery_trace.pending_relay_event_ids = pending_relay_event_ids;
message.delivery_trace.queued_protocol_targets = queued_protocol_targets;
message.delivery_trace.last_transport_error = last_transport_error;
}
pub(super) fn push_outgoing_message_with_id(
&mut self,
message_id: String,
chat_id: &str,
body: String,
created_at_secs: u64,
expires_at_secs: Option<u64>,
delivery: DeliveryState,
) -> ChatMessageSnapshot {
let (body, attachments) = extract_message_attachments(&body);
let message = ChatMessageSnapshot {
id: message_id,
chat_id: chat_id.to_string(),
kind: ChatMessageKind::User,
author: self
.state
.account
.as_ref()
.map(|account| account.display_name.clone())
.unwrap_or_else(|| "me".to_string()),
body,
attachments,
reactions: Vec::new(),
reactors: Vec::new(),
is_outgoing: true,
created_at_secs,
expires_at_secs,
delivery,
recipient_deliveries: self.initial_recipient_deliveries(chat_id, created_at_secs),
delivery_trace: MessageDeliveryTraceSnapshot::default(),
source_event_id: None,
};
self.threads
.entry(chat_id.to_string())
.or_insert_with(|| ThreadRecord {
chat_id: chat_id.to_string(),
unread_count: 0,
updated_at_secs: created_at_secs,
messages: Vec::new(),
})
.insert_message_sorted(message.clone());
if let Some(thread) = self.threads.get_mut(chat_id) {
thread.updated_at_secs = thread.updated_at_secs.max(created_at_secs);
}
self.bump_typing_floor(chat_id, created_at_secs);
if expires_at_secs.is_some() {
self.schedule_next_message_expiry();
}
message
}
#[allow(clippy::too_many_arguments)]
pub(super) fn push_incoming_message_from(
&mut self,
chat_id: &str,
message_id: Option<String>,
body: String,
created_at_secs: u64,
expires_at_secs: Option<u64>,
author: Option<String>,
source_event_id: Option<String>,
) {
let message_id_ref = message_id.as_deref();
let source_event_id_ref = source_event_id.as_deref();
if self.threads.get(chat_id).is_some_and(|thread| {
thread.messages.iter().any(|message| {
message_id_ref.is_some_and(|id| message.id == id)
|| source_event_id_ref.is_some_and(|event_id| {
message.source_event_id.as_deref() == Some(event_id)
})
})
}) {
return;
}
match self
.app_store
.message_exists(chat_id, message_id_ref, source_event_id_ref)
{
Ok(true) => return,
Ok(false) => {}
Err(error) => self.push_debug_log(
"storage.message.exists.error",
format!("chat_id={chat_id} error={error}"),
),
}
let message_id = message_id.unwrap_or_else(|| self.allocate_message_id());
let author = author.unwrap_or_else(|| self.owner_display_label(chat_id));
let should_count_unread = !self.is_chat_visible(chat_id);
let (body, attachments) = extract_message_attachments(&body);
let mut delivery_trace = delivery_trace_for_source_event(source_event_id.as_deref());
if let Some(channel) = source_event_id
.as_ref()
.and_then(|event_id| self.event_transport_channels.remove(event_id))
{
push_unique(&mut delivery_trace.transport_channels, &channel);
}
let message = ChatMessageSnapshot {
id: message_id,
chat_id: chat_id.to_string(),
kind: ChatMessageKind::User,
author,
body,
attachments,
reactions: Vec::new(),
reactors: Vec::new(),
is_outgoing: false,
created_at_secs,
expires_at_secs,
delivery: DeliveryState::Received,
recipient_deliveries: Vec::new(),
delivery_trace,
source_event_id,
};
let (thread_unread_count, thread_updated_at_secs) = {
let thread = self
.threads
.entry(chat_id.to_string())
.or_insert_with(|| ThreadRecord {
chat_id: chat_id.to_string(),
unread_count: 0,
updated_at_secs: created_at_secs,
messages: Vec::new(),
});
if should_count_unread {
thread.unread_count = thread.unread_count.saturating_add(1);
}
thread.updated_at_secs = thread.updated_at_secs.max(created_at_secs);
thread.insert_message_sorted(message.clone());
(thread.unread_count, thread.updated_at_secs)
};
if message.source_event_id.is_some() {
if let Err(error) = self.app_store.upsert_notification_preview_message(
chat_id,
thread_unread_count,
thread_updated_at_secs,
&message,
) {
self.push_debug_log(
"storage.message.preview_upsert.error",
format!("chat_id={chat_id} message_id={} error={error}", message.id),
);
}
}
self.bump_typing_floor(chat_id, created_at_secs);
if expires_at_secs.is_some() {
self.schedule_next_message_expiry();
}
}
pub(super) fn push_system_notice(&mut self, chat_id: &str, body: String, created_at_secs: u64) {
let message_id = self.allocate_message_id();
let should_count_unread = !self.is_chat_visible(chat_id);
let thread = self
.threads
.entry(chat_id.to_string())
.or_insert_with(|| ThreadRecord {
chat_id: chat_id.to_string(),
unread_count: 0,
updated_at_secs: created_at_secs,
messages: Vec::new(),
});
if thread
.messages
.iter()
.any(|message| message.author == "Iris" && message.body == body)
{
return;
}
if should_count_unread {
thread.unread_count = thread.unread_count.saturating_add(1);
}
thread.updated_at_secs = thread.updated_at_secs.max(created_at_secs);
thread.insert_message_sorted(ChatMessageSnapshot {
id: message_id,
chat_id: chat_id.to_string(),
kind: ChatMessageKind::System,
author: "Iris".to_string(),
body,
attachments: Vec::new(),
reactions: Vec::new(),
reactors: Vec::new(),
is_outgoing: false,
created_at_secs,
expires_at_secs: None,
delivery: DeliveryState::Received,
recipient_deliveries: Vec::new(),
delivery_trace: MessageDeliveryTraceSnapshot::default(),
source_event_id: None,
});
self.bump_typing_floor(chat_id, created_at_secs);
}
pub(super) fn delete_local_message(&mut self, chat_id: &str, message_id: &str) {
if chat_id.is_empty() || message_id.is_empty() {
return;
}
let Some(thread) = self.threads.get_mut(chat_id) else {
return;
};
let original_len = thread.messages.len();
thread.messages.retain(|message| message.id != message_id);
if thread.messages.len() == original_len {
return;
}
thread.updated_at_secs = thread
.messages
.last()
.map(|message| message.created_at_secs)
.unwrap_or(thread.updated_at_secs);
if self.active_chat_id.as_deref() == Some(chat_id) {
thread.unread_count = 0;
}
if let Err(error) = self.app_store.delete_message(chat_id, message_id) {
self.push_debug_log(
"storage.message.delete.error",
format!("chat_id={chat_id} message_id={message_id} error={error}"),
);
}
self.persist_best_effort();
self.rebuild_state();
self.emit_state();
}
pub(super) fn delete_chat(&mut self, chat_id: &str) {
if chat_id.is_empty() {
return;
}
let normalized = self
.normalize_chat_id(chat_id)
.unwrap_or_else(|| chat_id.to_string());
let removed_thread = self.threads.remove(&normalized).is_some();
if removed_thread {
if let Err(error) = self.app_store.delete_thread(&normalized) {
self.push_debug_log(
"storage.thread.delete.error",
format!("chat_id={normalized} error={error}"),
);
}
}
self.chat_message_ttl_seconds.remove(&normalized);
self.preferences
.muted_chat_ids
.retain(|chat_id| chat_id != &normalized);
self.mark_mobile_push_dirty();
self.typing_indicators
.retain(|_, indicator| indicator.chat_id != normalized);
self.typing_floor_secs.remove(&normalized);
let removed_group = if let Some(group_id) = parse_group_id_from_chat_id(&normalized) {
let was_present = self.groups.remove(&group_id).is_some();
if was_present {
self.sync_runtime_groups();
}
was_present
} else {
false
};
if !removed_thread && !removed_group {
return;
}
if self.active_chat_id.as_deref() == Some(normalized.as_str()) {
self.active_chat_id = None;
}
self.screen_stack.retain(|screen| match screen {
Screen::Chat { chat_id } => chat_id != &normalized,
Screen::GroupDetails { group_id } => {
parse_group_id_from_chat_id(&normalized).as_deref() != Some(group_id.as_str())
}
_ => true,
});
self.push_debug_log("chat.delete", normalized);
self.rebuild_state();
self.persist_best_effort();
self.emit_state();
}
pub(super) fn send_group_event(
&mut self,
chat_id: &str,
kind: u32,
content: &str,
_tags: Vec<Vec<String>>,
_now_ms: Option<u64>,
) {
if kind == CHAT_MESSAGE_KIND {
self.send_group_message(chat_id, content, unix_now(), None);
}
}
#[cfg(test)]
pub(super) fn apply_decrypted_runtime_message(
&mut self,
sender_owner: PublicKey,
sender_device: Option<PublicKey>,
content: String,
outer_event_id: Option<String>,
) {
self.apply_decrypted_runtime_message_with_metadata(
sender_owner,
sender_device,
None,
content,
outer_event_id,
);
}
pub(super) fn apply_decrypted_runtime_message_with_metadata(
&mut self,
sender_owner: PublicKey,
sender_device: Option<PublicKey>,
conversation_owner: Option<PublicKey>,
content: String,
outer_event_id: Option<String>,
) {
if let Some(logged_in) = self.logged_in.as_ref() {
let group_outcome = logged_in.ndr_runtime.group_handle_incoming_payload_outcome(
content.as_bytes(),
sender_owner,
sender_device,
);
if !group_outcome.events.is_empty() {
for group_event in group_outcome.events {
self.apply_group_decrypted_event(group_event);
}
self.request_protocol_subscription_refresh();
return;
}
if group_outcome.consumed {
self.request_protocol_subscription_refresh();
return;
}
}
let Some(runtime_rumor) = parse_runtime_rumor(&content) else {
let chat_id = self.logged_in.as_ref().and_then(|logged_in| {
direct_self_sync_chat_id(sender_owner, logged_in.owner_pubkey, conversation_owner)
});
self.apply_runtime_text_message(
sender_owner,
chat_id,
content,
unix_now().get(),
None,
outer_event_id.clone(),
outer_event_id,
);
return;
};
let kind = runtime_rumor.kind;
let created_at_secs = runtime_rumor.created_at_secs;
let expires_at_secs = message_expiration_from_tags(runtime_rumor.tags.iter());
let Some(local_owner) = self
.logged_in
.as_ref()
.map(|logged_in| logged_in.owner_pubkey)
else {
return;
};
let chat_id = chat_id_for_runtime_message(
sender_owner,
local_owner,
conversation_owner,
runtime_rumor.tags.iter(),
);
let is_outgoing = sender_owner == local_owner;
let message_id = runtime_rumor.id.or_else(|| outer_event_id.clone());
match kind {
CHAT_MESSAGE_KIND => {
self.apply_runtime_text_message(
sender_owner,
Some(chat_id.clone()),
runtime_rumor.content,
created_at_secs,
expires_at_secs,
message_id.clone(),
outer_event_id.clone(),
);
if !is_outgoing && self.preferences.send_read_receipts {
if let Some(receipt_id) = message_id {
self.send_receipt(&chat_id, "delivered", vec![receipt_id]);
}
}
}
REACTION_KIND => {
let sender_hex = sender_owner.to_hex();
for message_id in message_ids_from_tags(runtime_rumor.tags.iter()) {
self.apply_incoming_reaction_to_chat(
&chat_id,
&message_id,
&sender_hex,
&runtime_rumor.content,
);
}
}
RECEIPT_KIND => {
let delivery = match runtime_rumor.content.as_str() {
"seen" => DeliveryState::Seen,
_ => DeliveryState::Received,
};
self.apply_receipt_to_messages(
&chat_id,
&message_ids_from_tags(runtime_rumor.tags.iter()),
delivery,
is_outgoing,
Some(&sender_owner.to_hex()),
);
}
TYPING_KIND => {
if !is_outgoing {
self.apply_typing_event(
chat_id,
sender_owner.to_hex(),
created_at_secs,
expires_at_secs,
);
}
}
CHAT_SETTINGS_KIND => {
let actor = self.owner_display_label(&sender_owner.to_hex());
self.apply_chat_settings_control(
&chat_id,
&actor,
chat_settings_ttl_seconds(&runtime_rumor.content),
created_at_secs,
);
}
_ => {}
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn apply_runtime_text_message(
&mut self,
sender_owner: PublicKey,
chat_id: Option<String>,
body: String,
created_at_secs: u64,
expires_at_secs: Option<u64>,
message_id: Option<String>,
source_event_id: Option<String>,
) {
let Some(local_owner) = self
.logged_in
.as_ref()
.map(|logged_in| logged_in.owner_pubkey)
else {
return;
};
let chat_id = chat_id.unwrap_or_else(|| sender_owner.to_hex());
self.clear_typing_indicator(&chat_id, &sender_owner.to_hex());
if sender_owner == local_owner {
let message_id = message_id.unwrap_or_else(|| self.allocate_message_id());
if self.threads.get(&chat_id).is_some_and(|thread| {
thread
.messages
.iter()
.any(|message| message.id == message_id)
}) {
self.update_message_delivery(&chat_id, &message_id, DeliveryState::Sent);
} else {
self.push_outgoing_message_with_id(
message_id,
&chat_id,
body,
created_at_secs,
expires_at_secs,
DeliveryState::Sent,
);
}
return;
}
self.push_incoming_message_from(
&chat_id,
message_id,
body,
created_at_secs,
expires_at_secs,
Some(self.owner_display_label(&sender_owner.to_hex())),
source_event_id,
);
}
pub(super) fn allocate_message_id(&mut self) -> String {
let id = self.next_message_id;
self.next_message_id = self.next_message_id.saturating_add(1);
id.to_string()
}
fn initial_recipient_deliveries(
&self,
chat_id: &str,
created_at_secs: u64,
) -> Vec<MessageRecipientDeliverySnapshot> {
let local_owner = self
.logged_in
.as_ref()
.map(|logged_in| logged_in.owner_pubkey.to_hex());
let mut recipients = if let Some(group_id) = parse_group_id_from_chat_id(chat_id) {
self.groups
.get(&group_id)
.map(|group| {
group
.members
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default()
} else {
vec![chat_id.to_string()]
};
recipients.retain(|owner| local_owner.as_deref() != Some(owner.as_str()));
recipients.sort();
recipients.dedup();
recipients
.into_iter()
.map(|owner_pubkey_hex| MessageRecipientDeliverySnapshot {
owner_pubkey_hex,
delivery: DeliveryState::Pending,
updated_at_secs: created_at_secs,
})
.collect()
}
}
fn chat_message_from_persisted(message: &PersistedMessage) -> ChatMessageSnapshot {
let (body, parsed_attachments) = extract_message_attachments(&message.body);
ChatMessageSnapshot {
id: message.id.clone(),
chat_id: message.chat_id.clone(),
kind: message.kind.clone(),
author: message.author.clone(),
body,
attachments: if message.attachments.is_empty() {
parsed_attachments
} else {
message.attachments.clone()
},
reactions: message.reactions.clone(),
reactors: message.reactors.clone(),
is_outgoing: message.is_outgoing,
created_at_secs: message.created_at_secs,
expires_at_secs: message.expires_at_secs,
delivery: message.delivery.clone().into(),
recipient_deliveries: message.recipient_deliveries.clone(),
delivery_trace: message.delivery_trace.clone(),
source_event_id: message.source_event_id.clone(),
}
}
fn delivery_trace_for_source_event(source_event_id: Option<&str>) -> MessageDeliveryTraceSnapshot {
let mut trace = MessageDeliveryTraceSnapshot::default();
if let Some(source_event_id) = source_event_id {
trace.outer_event_ids.push(source_event_id.to_string());
}
trace
}
fn push_unique(values: &mut Vec<String>, value: &str) {
if values.iter().any(|existing| existing == value) {
return;
}
values.push(value.to_string());
}
fn message_order(message: &ChatMessageSnapshot) -> (u64, u64, &str) {
(
message.created_at_secs,
message.id.parse::<u64>().unwrap_or(u64::MAX),
message.id.as_str(),
)
}