#![deny(unreachable_pub)]
#[cfg(feature = "experimental-encrypted-state-events")]
use std::borrow::Borrow;
use std::future::IntoFuture;
use eyeball::SharedObservable;
use matrix_sdk_common::boxed_into_future;
use mime::Mime;
#[cfg(doc)]
use ruma::events::{MessageLikeUnsigned, SyncMessageLikeEvent};
use ruma::{
OwnedTransactionId, TransactionId,
api::client::message::send_message_event,
assign,
events::{AnyMessageLikeEventContent, MessageLikeEventContent},
serde::Raw,
};
#[cfg(feature = "experimental-encrypted-state-events")]
use ruma::{
api::client::state::send_state_event,
events::{AnyStateEventContent, StateEventContent},
};
use tracing::{Instrument, Span, info, trace};
use super::Room;
#[cfg(feature = "experimental-encrypted-state-events")]
use crate::utils::IntoRawStateEventContent;
use crate::{
Result, TransmissionProgress, attachment::AttachmentConfig, config::RequestConfig,
utils::IntoRawMessageLikeEventContent,
};
#[allow(missing_debug_implementations)]
pub struct SendMessageLikeEvent<'a> {
room: &'a Room,
event_type: String,
content: serde_json::Result<serde_json::Value>,
transaction_id: Option<OwnedTransactionId>,
request_config: Option<RequestConfig>,
}
impl<'a> SendMessageLikeEvent<'a> {
pub(crate) fn new(room: &'a Room, content: impl MessageLikeEventContent) -> Self {
let event_type = content.event_type().to_string();
let content = serde_json::to_value(&content);
Self { room, event_type, content, transaction_id: None, request_config: None }
}
pub fn with_transaction_id(mut self, txn_id: OwnedTransactionId) -> Self {
self.transaction_id = Some(txn_id);
self
}
pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
self.request_config = Some(request_config);
self
}
}
impl<'a> IntoFuture for SendMessageLikeEvent<'a> {
type Output = Result<send_message_event::v3::Response>;
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self { room, event_type, content, transaction_id, request_config } = self;
Box::pin(async move {
let content = content?;
assign!(room.send_raw(&event_type, content), { transaction_id, request_config }).await
})
}
}
#[allow(missing_debug_implementations)]
pub struct SendRawMessageLikeEvent<'a> {
room: &'a Room,
event_type: &'a str,
content: Raw<AnyMessageLikeEventContent>,
tracing_span: Span,
transaction_id: Option<OwnedTransactionId>,
request_config: Option<RequestConfig>,
}
impl<'a> SendRawMessageLikeEvent<'a> {
pub(crate) fn new(
room: &'a Room,
event_type: &'a str,
content: impl IntoRawMessageLikeEventContent,
) -> Self {
let content = content.into_raw_message_like_event_content();
Self {
room,
event_type,
content,
tracing_span: Span::current(),
transaction_id: None,
request_config: None,
}
}
pub fn with_transaction_id(mut self, txn_id: &TransactionId) -> Self {
self.transaction_id = Some(txn_id.to_owned());
self
}
pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
self.request_config = Some(request_config);
self
}
}
impl<'a> IntoFuture for SendRawMessageLikeEvent<'a> {
type Output = Result<send_message_event::v3::Response>;
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
#[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
let Self {
room,
mut event_type,
mut content,
tracing_span,
transaction_id,
request_config,
} = self;
let fut = async move {
room.ensure_room_joined()?;
let txn_id = transaction_id.unwrap_or_else(TransactionId::new);
Span::current().record("transaction_id", tracing::field::debug(&txn_id));
#[cfg(not(feature = "e2e-encryption"))]
trace!("Sending plaintext event to room because we don't have encryption support.");
#[cfg(feature = "e2e-encryption")]
if room.latest_encryption_state().await?.is_encrypted() {
Span::current().record("is_room_encrypted", true);
if event_type == "m.reaction" {
trace!("Sending plaintext event because of the event type.");
} else {
trace!(
room_id = ?room.room_id(),
"Sending encrypted event because the room is encrypted.",
);
ensure_room_encryption_ready(room).await?;
let olm = room.client.olm_machine().await;
let olm = olm.as_ref().expect("Olm machine wasn't started");
content = olm
.encrypt_room_event_raw(room.room_id(), event_type, &content)
.await?
.cast();
event_type = "m.room.encrypted";
}
} else {
Span::current().record("is_room_encrypted", false);
trace!("Sending plaintext event because the room is NOT encrypted.");
}
let request = send_message_event::v3::Request::new_raw(
room.room_id().to_owned(),
txn_id,
event_type.into(),
content,
);
let response = room.client.send(request).with_request_config(request_config).await?;
Span::current().record("event_id", tracing::field::debug(&response.event_id));
info!("Sent event in room");
Ok(response)
};
Box::pin(fut.instrument(tracing_span))
}
}
#[allow(missing_debug_implementations)]
pub struct SendAttachment<'a> {
room: &'a Room,
filename: String,
content_type: &'a Mime,
data: Vec<u8>,
config: AttachmentConfig,
tracing_span: Span,
send_progress: SharedObservable<TransmissionProgress>,
store_in_cache: bool,
}
impl<'a> SendAttachment<'a> {
pub(crate) fn new(
room: &'a Room,
filename: String,
content_type: &'a Mime,
data: Vec<u8>,
config: AttachmentConfig,
) -> Self {
Self {
room,
filename,
content_type,
data,
config,
tracing_span: Span::current(),
send_progress: Default::default(),
store_in_cache: false,
}
}
pub fn with_send_progress_observable(
mut self,
send_progress: SharedObservable<TransmissionProgress>,
) -> Self {
self.send_progress = send_progress;
self
}
pub fn store_in_cache(mut self) -> Self {
self.store_in_cache = true;
self
}
}
impl<'a> IntoFuture for SendAttachment<'a> {
type Output = Result<send_message_event::v3::Response>;
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self {
room,
filename,
content_type,
data,
config,
tracing_span,
send_progress,
store_in_cache,
} = self;
let fut = async move {
room.prepare_and_send_attachment(
filename,
content_type,
data,
config,
send_progress,
store_in_cache,
)
.await
};
Box::pin(fut.instrument(tracing_span))
}
}
#[cfg(feature = "experimental-encrypted-state-events")]
#[allow(missing_debug_implementations)]
pub struct SendRawStateEvent<'a> {
room: &'a Room,
event_type: &'a str,
state_key: &'a str,
content: Raw<AnyStateEventContent>,
tracing_span: Span,
request_config: Option<RequestConfig>,
}
#[cfg(feature = "experimental-encrypted-state-events")]
impl<'a> SendRawStateEvent<'a> {
pub(crate) fn new(
room: &'a Room,
event_type: &'a str,
state_key: &'a str,
content: impl IntoRawStateEventContent,
) -> Self {
let content = content.into_raw_state_event_content();
Self {
room,
event_type,
state_key,
content,
tracing_span: Span::current(),
request_config: None,
}
}
pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
self.request_config = Some(request_config);
self
}
fn should_encrypt(room: &Room, event_type: &str) -> bool {
if !room.encryption_state().is_state_encrypted() {
trace!("Sending plaintext event as the room does NOT support encrypted state events.");
return false;
}
if matches!(
event_type,
"m.room.create"
| "m.room.member"
| "m.room.join_rules"
| "m.room.power_levels"
| "m.room.third_party_invite"
| "m.room.history_visibility"
| "m.room.guest_access"
| "m.room.encryption"
| "m.space.child"
| "m.space.parent"
) {
trace!("Sending plaintext event as its type is excluded from encryption.");
return false;
}
true
}
}
#[cfg(feature = "experimental-encrypted-state-events")]
impl<'a> IntoFuture for SendRawStateEvent<'a> {
type Output = Result<send_state_event::v3::Response>;
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self { room, mut event_type, state_key, mut content, tracing_span, request_config } =
self;
let fut = async move {
room.ensure_room_joined()?;
let mut state_key = state_key.to_owned();
if Self::should_encrypt(room, event_type) {
use tracing::debug;
Span::current().record("should_encrypt", true);
debug!(
room_id = ?room.room_id(),
"Sending encrypted event because the room is encrypted.",
);
ensure_room_encryption_ready(room).await?;
let olm = room.client.olm_machine().await;
let olm = olm.as_ref().expect("Olm machine wasn't started");
content = olm
.encrypt_state_event_raw(room.room_id(), event_type, &state_key, &content)
.await?
.cast_unchecked();
state_key = format!("{event_type}:{state_key}");
event_type = "m.room.encrypted";
} else {
Span::current().record("should_encrypt", false);
}
let request = send_state_event::v3::Request::new_raw(
room.room_id().to_owned(),
event_type.into(),
state_key.to_owned(),
content,
);
let response = room.client.send(request).with_request_config(request_config).await?;
Span::current().record("event_id", tracing::field::debug(&response.event_id));
info!("Sent event in room");
Ok(response)
};
Box::pin(fut.instrument(tracing_span))
}
}
#[allow(missing_debug_implementations)]
#[cfg(feature = "experimental-encrypted-state-events")]
pub struct SendStateEvent<'a> {
room: &'a Room,
event_type: String,
state_key: String,
content: serde_json::Result<serde_json::Value>,
request_config: Option<RequestConfig>,
}
#[cfg(feature = "experimental-encrypted-state-events")]
impl<'a> SendStateEvent<'a> {
pub(crate) fn new<C, K>(room: &'a Room, state_key: &K, content: C) -> Self
where
C: StateEventContent,
C::StateKey: Borrow<K>,
K: AsRef<str> + ?Sized,
{
let event_type = content.event_type().to_string();
let state_key = state_key.as_ref().to_owned();
let content = serde_json::to_value(&content);
Self { room, event_type, state_key, content, request_config: None }
}
pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
self.request_config = Some(request_config);
self
}
}
#[cfg(feature = "experimental-encrypted-state-events")]
impl<'a> IntoFuture for SendStateEvent<'a> {
type Output = Result<send_state_event::v3::Response>;
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self { room, state_key, event_type, content, request_config } = self;
Box::pin(async move {
let content = content?;
assign!(room.send_state_event_raw(&event_type, &state_key, content), { request_config })
.await
})
}
}
#[cfg(feature = "e2e-encryption")]
async fn ensure_room_encryption_ready(room: &Room) -> Result<()> {
if !room.are_members_synced() {
room.sync_members().await?;
}
room.query_keys_for_untracked_or_dirty_users().await?;
room.preshare_room_key().await?;
Ok(())
}