1pub mod channel;
2mod conn;
3mod constants;
4mod database;
5pub mod event;
6pub mod handler;
7pub mod init;
8mod task;
9mod updater;
10pub mod user;
11
12use channel::{
13 load_list_item,
14 normal::{self, NormalChannelOp},
15 open::OpenChannelOp,
16 ChannelListItem, ChannelOp, ClientChannel,
17};
18use conn::Conn;
19use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
20
21use database::{
22 model::channel::ChannelListRow,
23 schema::{channel_list, user_profile},
24 PoolTaskError,
25};
26use talk_loco_client::{
27 talk::session::{channel::chat_on::ChatOnChannelType, TalkSession},
28 RequestError,
29};
30use task::BackgroundTask;
31use thiserror::Error;
32
33pub use talk_loco_client;
34
35#[derive(Debug)]
36pub struct HeadlessTalk {
37 pub conn: Conn,
38
39 pub _ping_task: BackgroundTask,
40 pub _stream_task: BackgroundTask,
41}
42
43impl HeadlessTalk {
44 pub fn user_id(&self) -> i64 {
45 self.conn.user_id
46 }
47
48 pub async fn channel_list(&self) -> Result<Vec<(i64, ChannelListItem)>, PoolTaskError> {
49 let rows = self
50 .conn
51 .pool
52 .spawn(|conn| {
53 let rows = channel_list::table
54 .select(channel_list::all_columns)
55 .load::<ChannelListRow>(conn)?;
56
57 Ok(rows)
58 })
59 .await?;
60
61 let mut list = Vec::with_capacity(rows.capacity());
62
63 for row in rows {
64 if let Some(list_item) = load_list_item(&self.conn.pool, &row).await? {
65 list.push((row.id, list_item))
66 }
67 }
68
69 Ok(list)
70 }
71
72 pub async fn load_channel(&self, id: i64) -> ClientResult<Option<ClientChannel>> {
73 let last_seen_log_id = self
74 .conn
75 .pool
76 .spawn(move |conn| {
77 let last_seen_log_id: Option<i64> = channel_list::table
78 .filter(channel_list::id.eq(id))
79 .select(channel_list::last_seen_log_id)
80 .first::<Option<i64>>(conn)?;
81
82 Ok(last_seen_log_id)
83 })
84 .await?;
85
86 let res = TalkSession(&self.conn.session)
87 .channel(id)
88 .chat_on(last_seen_log_id)
89 .await?;
90
91 if let (Some(active_user_ids), Some(watermarks)) = (res.active_user_ids, res.watermarks) {
92 let active_user_count = active_user_ids.len() as i32;
93 let watermark_iter = active_user_ids.into_iter().zip(watermarks.into_iter());
94
95 self.conn
96 .pool
97 .spawn_transaction(move |conn| {
98 diesel::update(channel_list::table)
99 .filter(channel_list::id.eq(id))
100 .set(channel_list::active_user_count.eq(active_user_count))
101 .execute(conn)?;
102
103 for (user_id, watermark) in watermark_iter {
104 diesel::update(user_profile::table)
105 .filter(
106 user_profile::channel_id
107 .eq(id)
108 .and(user_profile::id.eq(user_id)),
109 )
110 .set(user_profile::watermark.eq(watermark))
111 .execute(conn)?;
112 }
113
114 Ok(())
115 })
116 .await?;
117 }
118
119 Ok(match res.channel_type {
120 ChatOnChannelType::DirectChat(normal)
121 | ChatOnChannelType::MultiChat(normal)
122 | ChatOnChannelType::MemoChat(normal) => Some(ClientChannel::Normal(
123 normal::load_channel(id, &self.conn, normal).await?,
124 )),
125
126 _ => None,
127 })
128 }
129
130 pub fn channel(&self, id: i64) -> ChannelOp<'_> {
131 ChannelOp::new(id, &self.conn)
132 }
133
134 pub fn normal_channel(&self, id: i64) -> NormalChannelOp<'_> {
135 NormalChannelOp::new(id, &self.conn)
136 }
137
138 pub fn open_channel(&self, id: i64, link_id: i64) -> OpenChannelOp<'_> {
139 OpenChannelOp::new(id, link_id, &self.conn)
140 }
141
142 pub async fn set_status(&self, client_status: ClientStatus) -> ClientResult<()> {
143 TalkSession(&self.conn.session)
144 .set_status(client_status as _)
145 .await?;
146
147 Ok(())
148 }
149}
150
151#[repr(i32)]
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum ClientStatus {
154 Unlocked = 1,
155 Locked = 2,
156}
157
158pub type ClientResult<T> = Result<T, ClientError>;
159
160#[derive(Debug, Error)]
161#[error(transparent)]
162pub enum ClientError {
163 Request(#[from] RequestError),
164 Database(#[from] PoolTaskError),
165}