1use std::{
2 ops::Deref,
3 sync::{Arc, Weak},
4};
5
6use common::v1::types::{
7 voice::SfuCommand, AuditLogEntry, ChannelId, Media, Message, RoomId, SfuId, UserId,
8};
9use common::v1::types::{MessageSync, MessageType};
10use dashmap::DashMap;
11
12use sqlx::PgPool;
13use tokio::sync::broadcast::Sender;
14use tracing::error;
15use url::Url;
16
17use crate::{
18 config::Config,
19 data::{postgres::Postgres, Data},
20 services::Services,
21 sync::Connection,
22 Error, Result,
23};
24
25pub struct ServerStateInner {
26 pub config: Config,
27 pub pool: PgPool,
28 pub services: Weak<Services>,
29
30 pub sushi: Sender<MessageSync>,
32 pub sushi_sfu: Sender<SfuCommand>,
34
35 pub blobs: opendal::Operator,
37
38 pub sfus: DashMap<SfuId, ()>,
39 pub thread_to_sfu: DashMap<ChannelId, SfuId>,
40}
41
42pub struct ServerState {
43 pub inner: Arc<ServerStateInner>,
44 pub services: Arc<Services>,
45
46 pub syncers: Arc<DashMap<String, Connection>>,
48}
49
50impl ServerStateInner {
51 pub fn data(&self) -> Box<dyn Data> {
52 Box::new(Postgres {
53 pool: self.pool.clone(),
54 })
55 }
56
57 pub fn services(&self) -> Arc<Services> {
58 self.services
59 .upgrade()
60 .expect("services should always exist while serverstateinner is alive")
61 }
62
63 pub async fn broadcast_room(
70 &self,
71 _room_id: RoomId,
72 _user_id: UserId, msg: MessageSync,
74 ) -> Result<()> {
75 let _ = self.sushi.send(msg);
76 Ok(())
77 }
78
79 pub async fn broadcast_channel(
80 &self,
81 _thread_id: ChannelId,
82 _user_id: UserId,
83 msg: MessageSync,
84 ) -> Result<()> {
85 let _ = self.sushi.send(msg);
86 Ok(())
87 }
88
89 pub fn broadcast(&self, msg: MessageSync) -> Result<()> {
90 let _ = self.sushi.send(msg);
91 Ok(())
92 }
93
94 pub fn get_s3_url(&self, path: &str) -> Result<Url> {
95 let mut u = Url::parse("s3://")?;
96 u.set_host(Some(&self.config.s3.bucket))?;
97 u.set_path(path);
98 Ok(u)
99 }
100
101 pub async fn presign(&self, _media: &mut Media) -> Result<()> {
103 Ok(())
106 }
107
108 pub async fn audit_log_append(&self, entry: AuditLogEntry) -> Result<()> {
109 self.data().audit_logs_room_append(entry.clone()).await?;
110 self.broadcast_room(
111 entry.room_id,
112 entry.user_id,
113 MessageSync::AuditLogEntryCreate { entry },
114 )
115 .await?;
116 Ok(())
117 }
118
119 pub async fn presign_message(&self, message: &mut Message) -> Result<()> {
121 match &mut message.message_type {
122 MessageType::DefaultMarkdown(message) => {
123 for media in &mut message.attachments {
124 self.presign(media).await?;
125 }
126 for emb in &mut message.embeds {
127 if let Some(m) = &mut emb.media {
128 self.presign(m).await?;
129 }
130 if let Some(m) = &mut emb.author_avatar {
131 self.presign(m).await?;
132 }
133 if let Some(m) = &mut emb.site_avatar {
134 self.presign(m).await?;
135 }
136 }
137 }
138 _ => {}
139 }
140 Ok(())
141 }
142
143 pub async fn alloc_sfu(&self, thread_id: ChannelId) -> Result<SfuId> {
147 if let Some(existing) = self.thread_to_sfu.get(&thread_id) {
148 return Ok(*existing);
149 }
150
151 let sfu_thread_counts = DashMap::<SfuId, u64>::new();
152 for i in &self.sfus {
153 sfu_thread_counts.insert(*i.key(), 0);
154 }
155 for i in &self.thread_to_sfu {
156 *sfu_thread_counts.get_mut(i.value()).unwrap() += 1;
157 }
158 let mut sorted: Vec<_> = sfu_thread_counts.into_iter().collect();
159 sorted.sort_by_key(|(_, count)| *count);
160 if let Some((chosen, _)) = sorted.first() {
161 self.thread_to_sfu.insert(thread_id, *chosen);
162 let thread = self.services().channels.get(thread_id, None).await?;
163 self.sushi_sfu
164 .send(SfuCommand::Thread {
165 thread: thread.into(),
166 })
167 .unwrap();
168 Ok(*chosen)
169 } else {
170 error!("no available sfu");
171 Err(Error::BadStatic("no available sfu"))
172 }
173 }
174}
175
176impl ServerState {
177 pub fn new(config: Config, pool: PgPool, blobs: opendal::Operator) -> Self {
178 let services = Arc::new_cyclic(|weak| {
181 let inner = Arc::new(ServerStateInner {
182 config,
183 pool,
184 services: weak.to_owned(),
185 blobs,
186
187 sushi: tokio::sync::broadcast::channel(100).0,
189 sushi_sfu: tokio::sync::broadcast::channel(100).0,
190
191 sfus: DashMap::new(),
192 thread_to_sfu: DashMap::new(),
193 });
194 Services::new(inner.clone())
195 });
196 Self {
197 inner: services.state.clone(),
198 syncers: Arc::new(DashMap::new()),
199 services,
201 }
202 }
203
204 pub fn config(&self) -> &Config {
205 &self.inner.config
206 }
207
208 pub fn data(&self) -> Box<dyn Data> {
209 self.inner.data()
210 }
211
212 pub fn services(self: &Arc<Self>) -> Arc<Services> {
213 self.services.clone()
214 }
215}
216
217impl Deref for ServerState {
218 type Target = ServerStateInner;
219
220 fn deref(&self) -> &Self::Target {
221 &self.inner
222 }
223}