headless-talk 0.6.1

Headless talk implementation
Documentation
use arrayvec::ArrayVec;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
use futures_loco_protocol::session::LocoSession;
use nohash_hasher::IntMap;
use talk_loco_client::talk::session::load_channel_list::ChannelListData;

use crate::{
    database::{
        model::{channel::ChannelListRow, chat::ChatRow},
        schema::{channel_list, chat},
        DatabasePool,
    },
    ClientResult,
};

use super::{channel::ChannelUpdater, chat::ChatUpdater};

#[derive(Debug, Clone, Copy)]
pub struct ChannelListUpdater<'a> {
    session: &'a LocoSession,
    pool: &'a DatabasePool,
}

impl<'a> ChannelListUpdater<'a> {
    pub fn new(session: &'a LocoSession, pool: &'a DatabasePool) -> Self {
        Self { session, pool }
    }

    pub async fn update(
        self,
        iter: impl IntoIterator<Item = ChannelListData>,
        deleted_ids: impl IntoIterator<Item = i64> + Send + 'static,
    ) -> ClientResult<()> {
        let update_map = self
            .pool
            .spawn(|conn| {
                Ok(IntMap::from_iter(
                    channel_list::table
                        .select((channel_list::id, channel_list::last_update))
                        .load::<(i64, i64)>(conn)?
                        .into_iter(),
                ))
            })
            .await?;

        let mut update_list = Vec::<ChannelListRow>::new();
        let mut chat_list = Vec::<ChatRow>::new();

        for list_data in iter {
            let last_log_id = self
                .pool
                .spawn(move |conn| {
                    let last_log_id: Option<i64> = chat::table
                        .filter(chat::channel_id.eq(list_data.id))
                        .select(chat::log_id)
                        .order_by(chat::log_id.desc())
                        .first::<i64>(conn)
                        .optional()?;

                    Ok(last_log_id)
                })
                .await?
                .unwrap_or(0);

            if last_log_id < list_data.last_log_id {
                ChatUpdater::new(self.session, self.pool, list_data.id)
                    .update(last_log_id, list_data.last_log_id)
                    .await?;
            }

            if update_map
                .get(&list_data.id)
                .map(|&last_update| last_update >= list_data.last_update)
                .unwrap_or(false)
            {
                continue;
            }

            let channel_type = if let Some(ty) = list_data.channel_type.ty() {
                ty
            } else {
                continue;
            };

            if ChannelUpdater::new(list_data.id)
                .initialize(self.session, self.pool)
                .await?
                .is_none()
            {
                continue;
            }

            let list_row = ChannelListRow {
                id: list_data.id,
                channel_type: channel_type.as_str().to_string(),

                display_users: {
                    let mut vec = ArrayVec::<_, 4>::new();

                    if let Some(ids) = list_data.icon_user_ids {
                        vec.extend(ids.into_iter().take(4));
                    }

                    serde_json::to_string(&vec).unwrap()
                },

                unread_count: list_data.unread_count,
                active_user_count: list_data.active_member_count,

                last_seen_log_id: list_data.last_seen_log_id,
                last_update: list_data.last_update,
            };

            if let Some(chatlog) = list_data.chatlog {
                chat_list.push(ChatRow::from_chatlog(chatlog, None));
            }

            update_list.push(list_row);
        }

        self.pool
            .spawn_transaction(move |conn| {
                diesel::replace_into(channel_list::table)
                    .values(update_list)
                    .execute(conn)?;

                diesel::insert_or_ignore_into(chat::table)
                    .values(chat_list)
                    .execute(conn)?;

                for channel_id in deleted_ids {
                    ChannelUpdater::new(channel_id).remove(conn)?;
                }

                Ok(())
            })
            .await?;

        Ok(())
    }
}