1use super::HingeClient;
2use super::render::{render_profile, summarize_connection_initiation};
3use super::serde_helpers::{
4 attachment_from_value, parse_json_value_with_path, parse_ts, sanitize_component,
5};
6use crate::errors::HingeError;
7use crate::logging::log_request;
8use crate::models::{
9 ExportChatInput, ExportChatResult, ExportedMediaFile, SendbirdChannelHandle,
10 SendbirdGroupChannel, SendbirdMessage,
11};
12use crate::storage::Storage;
13use chrono::{DateTime, Local, Utc};
14use futures_util::{SinkExt, StreamExt};
15use serde_json::json;
16use std::cmp::min;
17use std::collections::HashSet;
18use std::fmt::Write as FmtWrite;
19use std::fs;
20use std::path::Path;
21use std::time::Duration;
22use tokio_tungstenite::tungstenite::Message;
23use uuid::Uuid;
24
25pub(super) const SENDBIRD_ACCEPT_LANGUAGE: &str = "en-GB";
26pub(super) const SENDBIRD_REST_USER_AGENT: &str = "Jios/4.26.0";
27pub(super) const SENDBIRD_WS_ORIGIN: &str = "https://web-sb-kr-7-704.sendbird.com";
28
29impl<S: Storage + Clone> HingeClient<S> {
30 pub(super) fn sendbird_header_value(&self) -> String {
31 format!(
32 "iOS,{},{},{}",
33 self.settings.os_version,
34 self.settings.sendbird_sdk_version,
35 self.settings.sendbird_app_id
36 )
37 }
38
39 pub(super) fn sendbird_user_agent_value(&self) -> String {
40 format!("iOS/c{}///", self.settings.sendbird_sdk_version)
41 }
42
43 pub(super) fn sendbird_sdk_user_agent_value(&self) -> String {
44 format!(
45 "main_sdk_info=chat/ios/{}&device_os_platform=ios&os_version={}",
46 self.settings.sendbird_sdk_version, self.settings.os_version
47 )
48 }
49
50 pub(super) fn sendbird_headers(&self) -> Result<reqwest::header::HeaderMap, HingeError> {
51 use reqwest::header::{HeaderMap, HeaderValue};
52 let mut h = HeaderMap::new();
53 h.insert("accept", HeaderValue::from_static("*/*"));
54 h.insert("connection", HeaderValue::from_static("keep-alive"));
55 h.insert(
56 "accept-language",
57 HeaderValue::from_static(SENDBIRD_ACCEPT_LANGUAGE),
58 );
59 h.insert(
60 "x-session-key",
61 HeaderValue::from_str(&self.session_id)
62 .map_err(|e| HingeError::Http(format!("Invalid session key header: {}", e)))?,
63 );
64 h.insert(
65 "x-device-id",
66 HeaderValue::from_str(&self.device_id)
67 .map_err(|e| HingeError::Http(format!("Invalid device id header: {}", e)))?,
68 );
69 h.insert(
70 "x-install-id",
71 HeaderValue::from_str(&self.install_id)
72 .map_err(|e| HingeError::Http(format!("Invalid install id header: {}", e)))?,
73 );
74 h.insert(
75 "sb-user-id",
76 HeaderValue::from_str(
77 self.hinge_auth
78 .as_ref()
79 .map(|token| token.identity_id.as_str())
80 .unwrap_or_default(),
81 )
82 .map_err(|e| HingeError::Http(format!("Invalid sb-user-id header: {}", e)))?,
83 );
84 if let Some(sb_auth) = &self.sendbird_auth {
85 h.insert(
86 "sb-access-token",
87 HeaderValue::from_str(&sb_auth.token)
88 .map_err(|e| HingeError::Http(format!("Invalid sb-access-token: {}", e)))?,
89 );
90 }
91 if let Some(session_key) = &self.sendbird_session_key {
92 h.insert(
93 "Session-Key",
94 HeaderValue::from_str(session_key)
95 .map_err(|e| HingeError::Http(format!("Invalid session key: {}", e)))?,
96 );
97 }
98 let ts = chrono::Utc::now().timestamp_millis();
100 h.insert(
101 "Request-Sent-Timestamp",
102 HeaderValue::from_str(&ts.to_string())
103 .map_err(|e| HingeError::Http(format!("Invalid timestamp: {}", e)))?,
104 );
105 h.insert(
107 "SendBird",
108 HeaderValue::from_str(&self.sendbird_header_value())
109 .map_err(|e| HingeError::Http(format!("Invalid SendBird header: {}", e)))?,
110 );
111 h.insert(
112 "SB-User-Agent",
113 HeaderValue::from_str(&self.sendbird_user_agent_value())
114 .map_err(|e| HingeError::Http(format!("Invalid SB-User-Agent: {}", e)))?,
115 );
116 h.insert(
117 "SB-SDK-User-Agent",
118 HeaderValue::from_str(&self.sendbird_sdk_user_agent_value())
119 .map_err(|e| HingeError::Http(format!("Invalid SB-SDK-User-Agent: {}", e)))?,
120 );
121 h.insert(
122 "user-agent",
123 HeaderValue::from_static(SENDBIRD_REST_USER_AGENT),
124 );
125 Ok(h)
126 }
127
128 async fn sendbird_get(&self, path_and_query: &str) -> Result<reqwest::Response, HingeError> {
129 let url = format!("{}/v3{}", self.settings.sendbird_api_url, path_and_query);
130 let headers = self.sendbird_headers()?;
131 log_request("GET", &url, &headers, None);
132 let res = self.http.get(url).headers(headers.clone()).send().await?;
133 log::info!("[sendbird] GET {} -> {}", path_and_query, res.status());
134 Ok(res)
135 }
136
137 async fn ensure_sendbird_session(&mut self) -> Result<(), HingeError> {
138 if self.sendbird_ws_connected {
140 return Ok(());
141 }
142
143 if self.sendbird_auth.is_none() {
145 self.authenticate_with_sendbird().await?;
146 }
147
148 let (cmd_tx, broadcast_tx) = self.start_sendbird_ws().await?;
150 self.sendbird_ws_cmd_tx = Some(cmd_tx);
151 self.sendbird_ws_broadcast_tx = Some(broadcast_tx);
152 self.sendbird_ws_connected = true;
153 Ok(())
154 }
155
156 async fn start_sendbird_ws(
157 &mut self,
158 ) -> Result<
159 (
160 tokio::sync::mpsc::UnboundedSender<String>,
161 tokio::sync::broadcast::Sender<String>,
162 ),
163 HingeError,
164 > {
165 let sb = self
166 .sendbird_auth
167 .as_ref()
168 .ok_or_else(|| HingeError::Auth("sendbird token missing".into()))?;
169 let user_id = self
170 .hinge_auth
171 .as_ref()
172 .map(|t| t.identity_id.clone())
173 .unwrap_or_default();
174 let ws_url = format!(
175 "{}/?p=iOS&sv={}&pv={}&uikit_config=0&use_local_cache=0&include_extra_data=premium_feature_list,file_upload_size_limit,emoji_hash,application_attributes,notifications,message_template,ai_agent&include_poll_details=1&user_id={}&ai={}&pmce=1&expiring_session=0&config_ts=0",
176 self.settings.sendbird_ws_url,
177 self.settings.sendbird_sdk_version,
178 self.settings.os_version,
179 user_id,
180 self.settings.sendbird_app_id
181 );
182 let ws_ts = chrono::Utc::now().timestamp_millis().to_string();
183 let host = ws_url
184 .trim_start_matches("wss://")
185 .trim_start_matches("ws://")
186 .split('/')
187 .next()
188 .unwrap_or("");
189 let ws_key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
190 let mut builder = tokio_tungstenite::tungstenite::http::Request::builder().uri(&ws_url);
191 if let Some(sk) = &self.sendbird_session_key {
192 builder = builder.header("SENDBIRD-WS-AUTH", sk);
193 } else {
194 builder = builder.header("SENDBIRD-WS-TOKEN", sb.token.clone());
195 }
196 builder = builder
197 .header("Accept", "*/*")
198 .header("Accept-Encoding", "gzip, deflate")
199 .header("Sec-WebSocket-Extensions", "permessage-deflate")
200 .header("Sec-WebSocket-Key", &ws_key)
201 .header("Sec-WebSocket-Version", "13")
202 .header("Request-Sent-Timestamp", &ws_ts)
203 .header("Host", host)
204 .header("Origin", SENDBIRD_WS_ORIGIN)
205 .header("Accept-Language", SENDBIRD_ACCEPT_LANGUAGE)
206 .header("x-session-key", &self.session_id)
207 .header("x-device-id", &self.device_id)
208 .header("x-install-id", &self.install_id)
209 .header("sb-user-id", &user_id)
210 .header("sb-access-token", &sb.token)
211 .header("SendBird", self.sendbird_header_value())
212 .header("SB-User-Agent", self.sendbird_user_agent_value())
213 .header("SB-SDK-User-Agent", self.sendbird_sdk_user_agent_value())
214 .header("Connection", "Upgrade")
215 .header("Upgrade", "websocket")
216 .header("User-Agent", &self.hinge_user_agent());
217 {
219 let mut pairs: Vec<(String, String)> = Vec::new();
220 if let Some(sk) = &self.sendbird_session_key {
221 pairs.push(("SENDBIRD-WS-AUTH".into(), sk.clone()));
222 } else {
223 pairs.push(("SENDBIRD-WS-TOKEN".into(), sb.token.clone()));
224 }
225 pairs.push(("Accept".into(), "*/*".into()));
226 pairs.push(("Accept-Encoding".into(), "gzip, deflate".into()));
227 pairs.push((
228 "Sec-WebSocket-Extensions".into(),
229 "permessage-deflate".into(),
230 ));
231 pairs.push(("Accept-Language".into(), SENDBIRD_ACCEPT_LANGUAGE.into()));
232 pairs.push(("Host".into(), host.to_string()));
233 pairs.push(("Origin".into(), SENDBIRD_WS_ORIGIN.into()));
234 pairs.push(("Sec-WebSocket-Key".into(), ws_key.clone()));
235 pairs.push(("Sec-WebSocket-Version".into(), "13".into()));
236 pairs.push(("Request-Sent-Timestamp".into(), ws_ts.clone()));
237 pairs.push(("x-session-key".into(), self.session_id.clone()));
238 pairs.push(("x-device-id".into(), self.device_id.clone()));
239 pairs.push(("x-install-id".into(), self.install_id.clone()));
240 pairs.push(("sb-user-id".into(), user_id.clone()));
241 pairs.push(("sb-access-token".into(), sb.token.clone()));
242 pairs.push(("SendBird".into(), self.sendbird_header_value()));
243 pairs.push(("SB-User-Agent".into(), self.sendbird_user_agent_value()));
244 pairs.push((
245 "SB-SDK-User-Agent".into(),
246 self.sendbird_sdk_user_agent_value(),
247 ));
248 pairs.push(("Connection".into(), "Upgrade".into()));
249 pairs.push(("Upgrade".into(), "websocket".into()));
250 pairs.push(("User-Agent".into(), self.hinge_user_agent()));
251 log::info!("━━━━━━━━━━ WS REQUEST ━━━━━━━━━━");
252 log::info!("GET {}", ws_url);
253 log::debug!("Headers:\n{}", crate::logging::format_ws_headers(&pairs));
254 log::info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
255 }
256
257 let req: tokio_tungstenite::tungstenite::http::Request<()> = builder
258 .body(())
259 .map_err(|e| HingeError::Http(e.to_string()))?;
260 let (ws, _resp) = tokio_tungstenite::connect_async(req)
261 .await
262 .map_err(|e| HingeError::Http(e.to_string()))?;
263 let (write_half, mut read_half) = ws.split();
264 let write_half = std::sync::Arc::new(tokio::sync::Mutex::new(write_half));
265
266 let (tx_cmd, mut rx_cmd) = tokio::sync::mpsc::unbounded_channel::<String>();
267 let (tx_broadcast, _rx_broadcast) = tokio::sync::broadcast::channel::<String>(1024);
268 let (sk_tx, sk_rx) = tokio::sync::oneshot::channel::<String>();
269
270 {
272 let write_for_pong = write_half.clone();
273 let tx_broadcast_c = tx_broadcast.clone();
274 let pending_requests = self.sendbird_ws_pending_requests.clone();
275 tokio::spawn(async move {
276 let mut sk_tx_opt = Some(sk_tx);
277 while let Some(msg) = read_half.next().await {
278 match msg {
279 Ok(Message::Ping(_)) => {
280 let mut w = write_for_pong.lock().await;
281 let _ = w.send(Message::Pong(Vec::new().into())).await;
282 }
283 Ok(Message::Text(t)) => {
284 let t = t.to_string();
285 if t.starts_with("LOGI")
287 && let Some(start) = t.find('{')
288 && let Ok(val) =
289 serde_json::from_str::<serde_json::Value>(&t[start..])
290 {
291 if let Some(k) = crate::ws::sendbird_logi_session_key(&val) {
292 let _ = tx_broadcast_c.send(format!("__SESSION_KEY__:{}", k));
293 if let Some(tx) = sk_tx_opt.take() {
294 let _ = tx.send(k.to_string());
295 }
296 }
297 log::info!(
299 "[sendbird ws] LOGI received - user_id: {}, ping_interval: {}, pong_timeout: {}",
300 val.get("user_id")
301 .and_then(|v| v.as_str())
302 .unwrap_or("unknown"),
303 val.get("ping_interval")
304 .and_then(|v| v.as_i64())
305 .unwrap_or(0),
306 val.get("pong_timeout")
307 .and_then(|v| v.as_i64())
308 .unwrap_or(0)
309 );
310 }
311 else if t.starts_with("PING") {
313 log::debug!("[sendbird ws] Received PING, sending PONG");
314 if let Some(start) = t.find('{')
315 && let Ok(_val) =
316 serde_json::from_str::<serde_json::Value>(&t[start..])
317 {
318 let pong_response = json!({
319 "sts": chrono::Utc::now().timestamp_millis(),
320 "ts": chrono::Utc::now().timestamp_millis()
321 });
322 let pong_msg = format!("PONG{}", pong_response);
323 let mut w = write_for_pong.lock().await;
324 let _ = w.send(Message::Text(pong_msg.into())).await;
325 }
326 }
327 else if t.starts_with("READ") && t.contains("channel_id") {
329 log::debug!("[sendbird ws] Received READ acknowledgment");
330 if let Some(start) = t.find('{')
332 && let Ok(val) =
333 serde_json::from_str::<serde_json::Value>(&t[start..])
334 && let Some(req_id) = val.get("req_id").and_then(|v| v.as_str())
335 {
336 let mut pending = pending_requests.lock().await;
337 if let Some(tx) = pending.remove(req_id) {
338 let _ = tx.send(val.clone());
339 log::debug!(
340 "[sendbird ws] Matched READ response for req_id: {}",
341 req_id
342 );
343 }
344 }
345 }
346 if t.starts_with("SYEV")
349 && let Some(start) = t.find('{')
350 && let Ok(val) =
351 serde_json::from_str::<serde_json::Value>(&t[start..])
352 {
353 let _ = tx_broadcast_c.send(t.clone());
355 if let Ok(evt) = serde_json::from_value::<
357 crate::models::SendbirdSyevEvent,
358 >(val.clone())
359 {
360 if evt.cat
362 == crate::models::SendbirdSyevEvent::CATEGORY_TYPING_START
363 {
364 log::debug!(
365 "[sendbird ws] SYEV typing start user={} channel={}",
366 evt.data
367 .as_ref()
368 .map(|u| u.user_id.as_str())
369 .unwrap_or("unknown"),
370 evt.channel_url
371 );
372 } else if evt.cat
373 == crate::models::SendbirdSyevEvent::CATEGORY_TYPING_END
374 {
375 log::debug!(
376 "[sendbird ws] SYEV typing end user={} channel={}",
377 evt.data
378 .as_ref()
379 .map(|u| u.user_id.as_str())
380 .unwrap_or("unknown"),
381 evt.channel_url
382 );
383 }
384 if let Ok(json_evt) = serde_json::to_string(&evt) {
386 let _ =
387 tx_broadcast_c.send(format!("__SYEV__:{}", json_evt));
388 }
389 continue;
390 }
391 }
392 let _ = tx_broadcast_c.send(t);
393 }
394 Ok(Message::Binary(b)) => {
395 let _ = tx_broadcast_c.send(String::from_utf8_lossy(&b).into_owned());
396 }
397 Ok(Message::Pong(_)) => {}
398 Ok(Message::Close(frame)) => {
399 if let Some(cf) = frame {
400 let code_u16: u16 = cf.code.into();
401
402 let now = chrono::Utc::now();
406 let ms_timestamp = now.timestamp_millis();
407
408 let last_5_of_ms = (ms_timestamp % 100000) as u16;
410 let last_5_of_seconds = ((ms_timestamp / 1000) % 100000) as u16;
411 let seconds_today = (now.timestamp() % 86400) as u16;
412 let ms_today = ((now.timestamp() % 86400) * 1000
413 + now.timestamp_subsec_millis() as i64)
414 as u32;
415 let ms_today_mod = (ms_today % 65536) as u16; log::debug!(
418 "[sendbird ws] Time analysis - code: {}, last5_ms: {}, last5_sec: {}, sec_today: {}, ms_today_mod: {}",
419 code_u16,
420 last_5_of_ms,
421 last_5_of_seconds,
422 seconds_today,
423 ms_today_mod
424 );
425
426 let code_desc = match code_u16 {
427 1000 => "Normal closure",
429 1001 => "Going away",
430 1002 => "Protocol error",
431 1003 => "Unsupported data",
432 1006 => "Abnormal closure",
433 1008 => "Policy violation",
434 1009 => "Message too big",
435 1010 => "Mandatory extension",
436 1011 => "Internal server error",
437 1015 => "TLS handshake failure",
438 _ if code_u16 >= 10000 => {
440 "Sendbird dynamic code (possibly time-derived)"
441 }
442 _ => "Non-standard close code",
443 };
444 log::warn!(
445 "[sendbird ws] Connection closed - code: {} ({}), reason: {}",
446 code_u16,
447 code_desc,
448 cf.reason
449 );
450 let _ = tx_broadcast_c
451 .send(format!("__CLOSE__:{}:{}", code_u16, cf.reason));
452
453 if !cf.reason.is_empty() {
456 log::info!(
457 "[sendbird ws] Close reason provided: {}",
458 cf.reason
459 );
460 }
461 } else {
462 log::warn!("[sendbird ws] Connection closed without frame");
463 let _ = tx_broadcast_c.send("__CLOSE__".into());
464 }
465 break;
466 }
467 Ok(_) => {}
468 Err(e) => {
469 log::error!("[sendbird ws] WebSocket error: {}", e);
470 let _ = tx_broadcast_c.send(format!("__ERROR__:{}", e));
471 break;
472 }
473 }
474 }
475 });
476 }
477
478 {
480 let write_for_cmds = write_half.clone();
481 tokio::spawn(async move {
482 while let Some(cmd) = rx_cmd.recv().await {
483 let mut w = write_for_cmds.lock().await;
484
485 if cmd.starts_with("__CLOSE__:") {
487 let parts: Vec<&str> = cmd
489 .strip_prefix("__CLOSE__:")
490 .unwrap_or("")
491 .split(':')
492 .collect();
493 let code = parts
494 .first()
495 .and_then(|s| s.parse::<u16>().ok())
496 .unwrap_or(1000);
497 let reason = parts.get(1).unwrap_or(&"").to_string();
498
499 let close_frame = tokio_tungstenite::tungstenite::protocol::CloseFrame {
501 code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::from(code),
502 reason: reason.into(),
503 };
504 let _ = w.send(Message::Close(Some(close_frame))).await;
505 break; } else {
507 let _ = w.send(Message::Text(cmd.into())).await;
509 }
510 }
511 });
512 }
513
514 if let Ok(k) = sk_rx.await {
516 self.sendbird_session_key = Some(k);
517 log::info!("[sendbird] Session-Key captured");
518 if let Some(path) = &self.session_path {
519 let _ = self.save_session(path);
520 }
521 } else {
522 log::warn!("Sendbird LOGI not received before startup return");
523 }
524 Ok((tx_cmd, tx_broadcast))
525 }
526
527 pub async fn sendbird_list_my_group_channels(
528 &mut self,
529 user_id: &str,
530 limit: usize,
531 ) -> Result<serde_json::Value, HingeError> {
532 self.ensure_sendbird_session().await?;
533 let q = format!(
534 "/users/{}/my_group_channels?&include_left_channel=false&member_state_filter=all&super_mode=all&show_latest_message=false&show_pinned_messages=false&unread_filter=all&show_delivery_receipt=true&show_conversation=false&show_member=true&show_empty=true&limit={}&user_id={}&is_feed_channel=false&order=latest_last_message&hidden_mode=unhidden_only&distinct_mode=all&show_read_receipt=true&show_metadata=true&is_explicit_request=true&show_frozen=true&public_mode=all&include_chat_notification=false",
535 user_id, limit, user_id
536 );
537 let res = self.sendbird_get(&q).await?;
538 self.parse_response(res).await
539 }
540
541 pub async fn sendbird_list_channels_typed(
542 &mut self,
543 limit: usize,
544 ) -> Result<crate::models::SendbirdChannelsResponse, HingeError> {
545 let user_id = self
546 .hinge_auth
547 .as_ref()
548 .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
549 .identity_id
550 .clone();
551 let limit = limit.clamp(1, 200);
552 let raw = self
553 .sendbird_list_my_group_channels(&user_id, limit)
554 .await?;
555 parse_json_value_with_path(raw).map_err(|e| {
556 HingeError::Serde(format!("Failed to parse Sendbird channels response: {}", e))
557 })
558 }
559
560 pub async fn sendbird_get_channel(
561 &mut self,
562 channel_url: &str,
563 ) -> Result<serde_json::Value, HingeError> {
564 self.ensure_sendbird_session().await?;
565 let q = format!(
566 "/sdk/group_channels/{}?&is_feed_channel=false&show_latest_message=false&show_metadata=false&show_empty=false&show_member=true&show_frozen=false&show_read_receipt=true&show_pinned_messages=false&include_chat_notification=false&show_delivery_receipt=true&show_conversation=true",
567 channel_url
568 );
569 let res = self.sendbird_get(&q).await?;
570 self.parse_response(res).await
571 }
572
573 pub async fn sendbird_get_channel_typed(
574 &mut self,
575 channel_url: &str,
576 ) -> Result<SendbirdGroupChannel, HingeError> {
577 let value = self.sendbird_get_channel(channel_url).await?;
578 parse_json_value_with_path(value)
579 .map_err(|e| HingeError::Serde(format!("Failed to parse channel: {}", e)))
580 }
581
582 pub async fn sendbird_get_messages(
583 &mut self,
584 channel_url: &str,
585 message_ts: i64,
586 prev_limit: usize,
587 ) -> Result<crate::models::SendbirdMessagesResponse, HingeError> {
588 self.ensure_sendbird_session().await?;
589 let q = format!(
590 "/group_channels/{}/messages?&include_reply_type=all&sdk_source=external_legacy&with_sorted_meta_array=true&message_ts={}&is_sdk=true&include_reactions_summary=true&include_parent_message_info=false&reverse=true&prev_limit={}&custom_types=%2A&include=false&next_limit=0&include_poll_details=true&show_subchannel_messages_only=false&include_thread_info=false",
591 channel_url, message_ts, prev_limit
592 );
593 let res = self.sendbird_get(&q).await?;
594 self.parse_response(res).await
595 }
596
597 pub async fn sendbird_get_full_messages(
598 &mut self,
599 channel_url: &str,
600 ) -> Result<Vec<SendbirdMessage>, HingeError> {
601 self.ensure_sendbird_session().await?;
602 const PAGE_SIZE: usize = 120;
603 let mut anchor = chrono::Utc::now().timestamp_millis();
604 let mut seen: HashSet<String> = HashSet::new();
605 let mut collected: Vec<(i64, SendbirdMessage)> = Vec::new();
606
607 loop {
608 let batch = self
609 .sendbird_get_messages(channel_url, anchor, PAGE_SIZE)
610 .await?;
611 if batch.messages.is_empty() {
612 break;
613 }
614 let mut earliest = anchor;
615 let mut added = 0usize;
616 for message in batch.messages {
617 if seen.insert(message.message_id.clone()) {
618 let ts = parse_ts(&message.created_at).unwrap_or(anchor);
619 earliest = min(earliest, ts.saturating_sub(1));
620 collected.push((ts, message));
621 added += 1;
622 }
623 }
624 if added == 0 {
625 break;
626 }
627 if earliest >= anchor || earliest <= 0 {
628 break;
629 }
630 anchor = earliest;
631 if collected.len() >= 4000 {
632 log::warn!(
633 "[sendbird] Stopping history fetch after {} messages to avoid huge exports",
634 collected.len()
635 );
636 break;
637 }
638 }
639
640 collected.sort_by_key(|(ts, _)| *ts);
641 Ok(collected.into_iter().map(|(_, msg)| msg).collect())
642 }
643
644 pub async fn export_chat(
645 &mut self,
646 input: ExportChatInput,
647 ) -> Result<ExportChatResult, HingeError> {
648 self.ensure_sendbird_session().await?;
649 let auth = self
650 .hinge_auth
651 .as_ref()
652 .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
653 .clone();
654 let self_user_id = auth.identity_id.clone();
655
656 let prompts_manager = match self.fetch_prompts_manager().await {
657 Ok(mgr) => Some(mgr),
658 Err(err) => {
659 log::warn!("Failed to prefetch prompts for export: {}", err);
660 None
661 }
662 };
663
664 let channel = self.sendbird_get_channel_typed(&input.channel_url).await?;
665 let partner = channel
666 .members
667 .iter()
668 .find(|member| !member.user_id.is_empty() && member.user_id != self_user_id)
669 .cloned()
670 .ok_or_else(|| HingeError::Http("unable to determine conversation partner".into()))?;
671
672 let peer_id = partner.user_id.clone();
673 let profile = self
674 .get_profiles(vec![peer_id.clone()])
675 .await?
676 .into_iter()
677 .next();
678 let profile_content = self
679 .get_profile_content(vec![peer_id.clone()])
680 .await?
681 .into_iter()
682 .next();
683
684 let display_name = profile
685 .as_ref()
686 .map(|p| p.profile.first_name.clone())
687 .filter(|name| !name.trim().is_empty())
688 .or_else(|| {
689 if !partner.nickname.trim().is_empty() {
690 Some(partner.nickname.clone())
691 } else {
692 None
693 }
694 })
695 .unwrap_or_else(|| peer_id.clone());
696
697 let age_label = profile
698 .as_ref()
699 .and_then(|p| p.profile.age)
700 .map(|age| age.to_string())
701 .unwrap_or_else(|| "Unknown age".to_string());
702
703 let initiation_summary_lines = if let Some(lines) = input.initiation_summary_lines.clone() {
704 if lines.is_empty() { None } else { Some(lines) }
705 } else {
706 match self.get_connections_v2().await {
707 Ok(resp) => resp
708 .connections
709 .into_iter()
710 .find(|conn| {
711 let initiator = conn.initiator_id.trim();
712 let subject = conn.subject_id.trim();
713 (!initiator.is_empty() && initiator == self_user_id && subject == peer_id)
714 || (!subject.is_empty()
715 && subject == self_user_id
716 && initiator == peer_id)
717 })
718 .and_then(|conn| {
719 summarize_connection_initiation(
720 &conn,
721 &self_user_id,
722 &peer_id,
723 &display_name,
724 )
725 }),
726 Err(err) => {
727 log::warn!(
728 "Failed to fetch connections for initiation summary: {}",
729 err
730 );
731 None
732 }
733 }
734 };
735
736 let base_dir = Path::new(&input.output_dir);
737 let export_dir = base_dir.to_path_buf();
738 fs::create_dir_all(&export_dir).map_err(|e| HingeError::Storage(e.to_string()))?;
739
740 let messages = self.sendbird_get_full_messages(&input.channel_url).await?;
741
742 let mut transcript = String::new();
743 writeln!(transcript, "Chat with {} ({})", display_name, age_label).ok();
744 writeln!(transcript, "Channel: {}", input.channel_url).ok();
745 writeln!(transcript, "Exported at {}", Utc::now().to_rfc3339()).ok();
746 if let Some(lines) = &initiation_summary_lines {
747 for line in lines {
748 writeln!(transcript, "{line}").ok();
749 }
750 }
751 transcript.push('\n');
752
753 let mut media_files: Vec<ExportedMediaFile> = Vec::new();
754
755 if input.include_media
756 && let Some(ref content) = profile_content
757 {
758 for (idx, photo) in content.content.photos.iter().enumerate() {
759 let mut file_name = format!("profile_photo_{}", idx + 1);
760 if let Some(ext) = photo
761 .url
762 .split('.')
763 .next_back()
764 .filter(|part| part.len() <= 5)
765 {
766 file_name.push('.');
767 file_name.push_str(ext);
768 }
769 let sanitized = sanitize_component(&file_name);
770 let target_path = export_dir.join(&sanitized);
771 let bytes = self.http_get_bytes(&photo.url).await?;
772 fs::write(&target_path, &bytes).map_err(|e| HingeError::Storage(e.to_string()))?;
773 media_files.push(ExportedMediaFile {
774 message_id: format!("profile_photo_{}", idx + 1),
775 file_name: sanitized.clone(),
776 file_path: target_path.to_string_lossy().to_string(),
777 });
778 }
779 }
780
781 for message in &messages {
782 let timestamp = parse_ts(&message.created_at).unwrap_or(0);
783 let local_time: DateTime<Local> = DateTime::<Utc>::from_timestamp_millis(timestamp)
784 .map(|dt| dt.with_timezone(&Local))
785 .unwrap_or_else(Local::now);
786 let sender = if message.user.user_id == self_user_id {
787 "You".to_string()
788 } else if !message.user.nickname.is_empty() {
789 message.user.nickname.clone()
790 } else {
791 display_name.clone()
792 };
793 let body = if !message.message.trim().is_empty() {
794 message.message.clone()
795 } else if !message.data.trim().is_empty() {
796 message.data.clone()
797 } else if !message.custom_type.trim().is_empty() {
798 format!("[{} message]", message.custom_type)
799 } else {
800 "[non-text message]".into()
801 };
802
803 writeln!(
804 transcript,
805 "{} - {}: {}",
806 local_time.format("%Y-%m-%d %H:%M:%S"),
807 sender,
808 body
809 )
810 .ok();
811
812 if input.include_media
813 && let Some((url, name)) = attachment_from_value(&message.file)
814 {
815 let sanitized = sanitize_component(&name);
816 let target_path = export_dir.join(&sanitized);
817 let bytes = self.http_get_bytes(&url).await?;
818 fs::write(&target_path, &bytes).map_err(|e| HingeError::Storage(e.to_string()))?;
819 writeln!(transcript, " [Saved attachment: {}]", sanitized).ok();
820 media_files.push(ExportedMediaFile {
821 message_id: message.message_id.clone(),
822 file_name: sanitized.clone(),
823 file_path: target_path.to_string_lossy().to_string(),
824 });
825 }
826 }
827
828 let transcript_path = export_dir.join("chat.txt");
829 fs::write(&transcript_path, transcript).map_err(|e| HingeError::Storage(e.to_string()))?;
830
831 let profile_text = render_profile(
832 profile.as_ref(),
833 profile_content.as_ref(),
834 prompts_manager.as_ref(),
835 );
836 let profile_path = if !profile_text.trim().is_empty() {
837 let path = export_dir.join("profile.txt");
838 fs::write(&path, profile_text).map_err(|e| HingeError::Storage(e.to_string()))?;
839 Some(path)
840 } else {
841 None
842 };
843
844 Ok(ExportChatResult {
845 folder_path: export_dir.to_string_lossy().to_string(),
846 transcript_path: transcript_path.to_string_lossy().to_string(),
847 profile_path: profile_path.map(|p| p.to_string_lossy().to_string()),
848 message_count: messages.len().min(i32::MAX as usize) as i32,
849 media_files,
850 })
851 }
852
853 pub async fn sendbird_create_distinct_dm(
854 &mut self,
855 self_user_id: &str,
856 peer_user_id: &str,
857 data_mm: i32,
858 ) -> Result<serde_json::Value, HingeError> {
859 self.ensure_sendbird_session().await?;
860 let payload = json!({
861 "is_ephemeral": false,
862 "is_exclusive": false,
863 "data": format!("{{\n \"mm\" : {}\n}}", data_mm),
864 "user_ids": [peer_user_id, self_user_id],
865 "is_super": false,
866 "is_distinct": true,
867 "strict": false,
868 "is_broadcast": false,
869 "message_survival_seconds": -1,
870 "is_public": false
871 });
872 let url = format!(
873 "{}/v3{}",
874 self.settings.sendbird_api_url, "/group_channels?"
875 );
876 let mut headers = self.sendbird_headers()?;
877 use reqwest::header::HeaderValue;
878 headers.insert(
879 "content-type",
880 HeaderValue::from_static("application/x-www-form-urlencoded"),
881 );
882 log_request("POST", &url, &headers, Some(&payload));
883 let res = self
884 .http
885 .post(url)
886 .headers(headers)
887 .body(serde_json::to_string(&payload).unwrap_or_default())
888 .send()
889 .await?;
890 log::info!("[sendbird] POST /group_channels -> {}", res.status());
891 self.parse_response(res).await
892 }
893
894 pub async fn sendbird_get_or_create_dm_channel(
895 &mut self,
896 self_user_id: &str,
897 peer_user_id: &str,
898 ) -> Result<String, HingeError> {
899 let q = format!(
901 "/users/{}/my_group_channels?&members_exactly_in={}&show_latest_message=false&distinct_mode=all&hidden_mode=unhidden_only&show_pinned_messages=false&show_metadata=true&member_state_filter=all&user_id={}&is_explicit_request=true&public_mode=all&include_left_channel=false&show_conversation=false&show_frozen=true&is_feed_channel=false&show_delivery_receipt=true&unread_filter=all&super_mode=all&show_member=true&show_read_receipt=true&order=chronological&show_empty=true&include_chat_notification=false&limit=1",
902 self_user_id, peer_user_id, self_user_id
903 );
904 self.ensure_sendbird_session().await?;
905 let res = self.sendbird_get(&q).await?;
906 let v: serde_json::Value = self.parse_response(res).await?;
907 if let Some(url) = v
908 .get("channels")
909 .and_then(|c| c.as_array())
910 .and_then(|arr| arr.first())
911 .and_then(|c| c.get("channel_url"))
912 .and_then(|s| s.as_str())
913 {
914 return Ok(url.to_string());
915 }
916 let created = self
917 .sendbird_create_distinct_dm(self_user_id, peer_user_id, 1)
918 .await?;
919 let url = created
920 .get("channel_url")
921 .and_then(|s| s.as_str())
922 .ok_or_else(|| HingeError::Http("missing channel_url in create response".into()))?;
923 Ok(url.to_string())
924 }
925
926 pub async fn ensure_sendbird_channel_with(
927 &mut self,
928 peer_user_id: &str,
929 ) -> Result<SendbirdChannelHandle, HingeError> {
930 let self_user_id = self
931 .hinge_auth
932 .as_ref()
933 .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
934 .identity_id
935 .clone();
936 let channel_url = self
937 .sendbird_get_or_create_dm_channel(&self_user_id, peer_user_id)
938 .await?;
939 Ok(SendbirdChannelHandle { channel_url })
940 }
941
942 pub async fn sendbird_init_flow(&mut self) -> Result<serde_json::Value, HingeError> {
943 self.ensure_sendbird_session().await?;
945 let user_id = self
946 .hinge_auth
947 .as_ref()
948 .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
949 .identity_id
950 .clone();
951 let res = self.sendbird_list_my_group_channels(&user_id, 20).await?;
952 Ok(res)
953 }
954
955 pub async fn sendbird_creds(&mut self) -> Result<serde_json::Value, HingeError> {
957 if self.sendbird_auth.is_none() {
959 self.authenticate_with_sendbird().await?;
960 }
961 let app_id = self.settings.sendbird_app_id.clone();
962 let token = self
963 .sendbird_auth
964 .as_ref()
965 .map(|t| t.token.clone())
966 .unwrap_or_default();
967 Ok(serde_json::json!({
968 "appId": app_id,
969 "token": token
970 }))
971 }
972
973 pub async fn sendbird_ws_subscribe(
975 &mut self,
976 ) -> Result<
977 (
978 tokio::sync::mpsc::UnboundedSender<String>,
979 tokio::sync::broadcast::Receiver<String>,
980 ),
981 HingeError,
982 > {
983 self.ensure_sendbird_session().await?;
984 let cmd = self
985 .sendbird_ws_cmd_tx
986 .as_ref()
987 .cloned()
988 .ok_or_else(|| HingeError::Http("sendbird ws not started".into()))?;
989 let tx = self
990 .sendbird_ws_broadcast_tx
991 .as_ref()
992 .cloned()
993 .ok_or_else(|| HingeError::Http("sendbird ws broadcast not available".into()))?;
994 let rx = tx.subscribe();
995 Ok((cmd, rx))
996 }
997
998 pub async fn sendbird_ws_send_command(&mut self, command: String) -> Result<(), HingeError> {
1000 self.ensure_sendbird_session().await?;
1001 let tx = self
1002 .sendbird_ws_cmd_tx
1003 .as_ref()
1004 .cloned()
1005 .ok_or_else(|| HingeError::Http("sendbird ws not started".into()))?;
1006 tx.send(command)
1007 .map_err(|e| HingeError::Http(format!("Failed to send WS command: {}", e)))?;
1008 Ok(())
1009 }
1010
1011 pub async fn sendbird_ws_send_read(&mut self, channel_url: &str) -> Result<(), HingeError> {
1013 let req_id = Uuid::new_v4().to_string().to_uppercase();
1014 let read_command = format!(
1015 r#"READ{{"req_id":"{}","channel_url":"{}"}}"#,
1016 req_id, channel_url
1017 );
1018 self.sendbird_ws_send_command(read_command).await
1019 }
1020
1021 pub async fn sendbird_ws_send_read_and_wait(
1023 &mut self,
1024 channel_url: &str,
1025 ) -> Result<crate::models::SendbirdReadResponse, HingeError> {
1026 self.ensure_sendbird_session().await?;
1027
1028 let req_id = Uuid::new_v4().to_string().to_uppercase();
1030
1031 let (tx, rx) = tokio::sync::oneshot::channel();
1033
1034 {
1036 let mut pending = self.sendbird_ws_pending_requests.lock().await;
1037 pending.insert(req_id.clone(), tx);
1038 }
1039
1040 let read_command = format!(
1042 r#"READ{{"req_id":"{}","channel_url":"{}"}}"#,
1043 req_id, channel_url
1044 );
1045 self.sendbird_ws_send_command(read_command).await?;
1046
1047 match tokio::time::timeout(Duration::from_secs(5), rx).await {
1049 Ok(Ok(response)) => {
1050 parse_json_value_with_path(response)
1052 .map_err(|e| HingeError::Http(format!("Failed to parse READ response: {}", e)))
1053 }
1054 Ok(Err(_)) => {
1055 let mut pending = self.sendbird_ws_pending_requests.lock().await;
1057 pending.remove(&req_id);
1058 Err(HingeError::Http("READ response channel dropped".into()))
1059 }
1060 Err(_) => {
1061 let mut pending = self.sendbird_ws_pending_requests.lock().await;
1063 pending.remove(&req_id);
1064 Err(HingeError::Http("READ response timeout".into()))
1065 }
1066 }
1067 }
1068
1069 pub async fn sendbird_ws_send_ping(&mut self) -> Result<(), HingeError> {
1071 let req_id = Uuid::new_v4().to_string().to_uppercase();
1072 let ping_command = format!(r#"PING{{"req_id":"{}"}}"#, req_id);
1073 self.sendbird_ws_send_command(ping_command).await
1074 }
1075
1076 pub async fn sendbird_ws_send_typing_start(
1078 &mut self,
1079 channel_url: &str,
1080 ) -> Result<(), HingeError> {
1081 let timestamp = chrono::Utc::now().timestamp_millis();
1082 let tpst_command = format!(
1083 r#"TPST{{"req_id":null,"channel_url":"{}","time":{}}}"#,
1084 channel_url, timestamp
1085 );
1086 self.sendbird_ws_send_command(tpst_command).await
1087 }
1088
1089 pub async fn sendbird_ws_send_typing_end(
1091 &mut self,
1092 channel_url: &str,
1093 ) -> Result<(), HingeError> {
1094 let timestamp = chrono::Utc::now().timestamp_millis();
1095 let tpen_command = format!(
1096 r#"TPEN{{"req_id":null,"channel_url":"{}","time":{}}}"#,
1097 channel_url, timestamp
1098 );
1099 self.sendbird_ws_send_command(tpen_command).await
1100 }
1101
1102 pub async fn sendbird_ws_send_enter_channel(
1104 &mut self,
1105 channel_url: &str,
1106 ) -> Result<(), HingeError> {
1107 let entr_command = format!(r#"ENTR{{"req_id":null,"channel_url":"{}"}}"#, channel_url);
1108 self.sendbird_ws_send_command(entr_command).await
1109 }
1110
1111 pub async fn sendbird_ws_send_exit_channel(
1113 &mut self,
1114 channel_url: &str,
1115 ) -> Result<(), HingeError> {
1116 let exit_command = format!(r#"EXIT{{"req_id":null,"channel_url":"{}"}}"#, channel_url);
1117 self.sendbird_ws_send_command(exit_command).await
1118 }
1119
1120 pub async fn sendbird_ws_send_message_ack(
1122 &mut self,
1123 channel_url: &str,
1124 message_id: &str,
1125 ) -> Result<(), HingeError> {
1126 let mack_command = format!(
1127 r#"MACK{{"req_id":null,"channel_url":"{}","msg_id":"{}"}}"#,
1128 channel_url, message_id
1129 );
1130 self.sendbird_ws_send_command(mack_command).await
1131 }
1132
1133 pub async fn sendbird_ws_close(
1135 &mut self,
1136 code: Option<u16>,
1137 reason: Option<String>,
1138 ) -> Result<(), HingeError> {
1139 if let Some(ref tx) = self.sendbird_ws_cmd_tx {
1141 let close_code = code.unwrap_or(1000); let close_reason = reason.unwrap_or_else(|| "Client initiated close".to_string());
1144
1145 let close_command = format!("__CLOSE__:{}:{}", close_code, close_reason);
1148 let _ = tx.send(close_command);
1149
1150 log::info!(
1151 "[sendbird ws] Closing connection with code {} reason: {}",
1152 close_code,
1153 close_reason
1154 );
1155 }
1156
1157 self.sendbird_ws_cmd_tx = None;
1159 self.sendbird_ws_broadcast_tx = None;
1160 self.sendbird_ws_connected = false;
1161
1162 let mut pending = self.sendbird_ws_pending_requests.lock().await;
1164 pending.clear();
1165
1166 Ok(())
1167 }
1168
1169 pub async fn sendbird_ws_ensure_connected(&mut self) -> Result<bool, HingeError> {
1171 if self.sendbird_ws_cmd_tx.is_some() {
1173 if self.sendbird_ws_send_ping().await.is_ok() {
1175 return Ok(true);
1176 }
1177 }
1178
1179 self.sendbird_ws_cmd_tx = None;
1181 self.sendbird_ws_broadcast_tx = None;
1182
1183 log::info!("[sendbird ws] Reconnecting WebSocket...");
1185 self.start_sendbird_ws().await?;
1186 Ok(true)
1187 }
1188}