mxlink 1.15.0

A library for connecting (linking) to the Matrix Client Server API (for building bots, etc.)
Documentation
use matrix_sdk::{
    Room,
    deserialized_responses::TimelineEvent,
    ruma::{
        OwnedEventId,
        api::client::relations::get_relating_events_with_rel_type,
        events::{
            AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
            relation::RelationType,
        },
    },
};

const FETCH_BATCH_SIZE: u32 = 1000;

#[non_exhaustive]
pub struct ThreadGetMessagesParams {
    pub batch_size: u32,
}

impl Default for ThreadGetMessagesParams {
    fn default() -> Self {
        Self {
            batch_size: FETCH_BATCH_SIZE,
        }
    }
}

impl ThreadGetMessagesParams {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn batch_size(mut self, size: u32) -> Self {
        self.batch_size = size;
        self
    }
}

#[derive(Clone)]
pub struct Threads {
    matrix_link: super::MatrixLink,
}

impl Threads {
    pub(super) fn new(matrix_link: super::MatrixLink) -> Self {
        Self { matrix_link }
    }

    #[tracing::instrument(name="threads_get_messages", skip_all, fields(room_id = room.room_id().as_str(), thread_id = thread_id.as_str()))]
    pub async fn get_messages(
        &self,
        room: &Room,
        thread_id: OwnedEventId,
        params: ThreadGetMessagesParams,
    ) -> Result<Vec<AnySyncMessageLikeEvent>, matrix_sdk::Error> {
        let mut events: Vec<AnySyncMessageLikeEvent> = Vec::new();

        tracing::trace!("Fetching thread root event..");
        let thread_event: TimelineEvent = room.event(&thread_id, None).await?;
        if let AnySyncTimelineEvent::MessageLike(thread_event) = thread_event.raw().deserialize()? {
            events.push(thread_event);
        }

        let mut from: Option<String> = Some(String::new());

        while from.is_some() {
            tracing::trace!(
                ?from,
                batch_size = params.batch_size,
                "Fetching related events batch..",
            );

            let mut request = get_relating_events_with_rel_type::v1::Request::new(
                room.room_id().to_owned(),
                thread_id.clone(),
                RelationType::Thread,
            );

            request.from = from.clone();
            request.limit = Some(params.batch_size.into());

            let http_response = self.matrix_link.client().send(request).await?;

            extract_messages_from_http_response(room, http_response.clone(), &mut events).await?;

            from = http_response.next_batch.clone();
        }

        events.sort_by_key(|event| event.origin_server_ts());

        Ok(events)
    }
}

async fn extract_messages_from_http_response(
    room: &Room,
    http_response: get_relating_events_with_rel_type::v1::Response,
    events: &mut Vec<AnySyncMessageLikeEvent>,
) -> Result<(), matrix_sdk::Error> {
    for event in http_response.chunk.iter().rev() {
        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
            SyncMessageLikeEvent::Original(_),
        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
        {
            if let Ok(event) = room.decrypt_event(event.cast_ref_unchecked(), None).await {
                if let AnySyncTimelineEvent::MessageLike(ev) = event.raw().deserialize()? {
                    events.push(ev);
                }
            } else {
                tracing::error!("failed-to-decrypt?: {:?}", event);
            }
        } else {
            events.push(event.deserialize_as::<AnySyncMessageLikeEvent>()?);
        };
    }

    Ok(())
}