Skip to main content

hinge_rs/client/
sendbird.rs

1use super::HingeClient;
2use super::render::{render_profile, summarize_connection_initiation};
3use super::serde_helpers::{
4    attachment_from_value, parse_json_value_with_path, parse_ts, sanitize_component,
5};
6use crate::errors::HingeError;
7use crate::logging::log_request;
8use crate::models::{
9    ExportChatInput, ExportChatResult, ExportedMediaFile, SendbirdChannelHandle,
10    SendbirdGroupChannel, SendbirdMessage,
11};
12use crate::storage::Storage;
13use chrono::{DateTime, Local, Utc};
14use futures_util::{SinkExt, StreamExt};
15use serde_json::json;
16use std::cmp::min;
17use std::collections::HashSet;
18use std::fmt::Write as FmtWrite;
19use std::fs;
20use std::path::Path;
21use std::time::Duration;
22use tokio_tungstenite::tungstenite::Message;
23use uuid::Uuid;
24
25pub(super) const SENDBIRD_ACCEPT_LANGUAGE: &str = "en-GB";
26pub(super) const SENDBIRD_REST_USER_AGENT: &str = "Jios/4.26.0";
27pub(super) const SENDBIRD_WS_ORIGIN: &str = "https://web-sb-kr-7-704.sendbird.com";
28
29impl<S: Storage + Clone> HingeClient<S> {
30    pub(super) fn sendbird_header_value(&self) -> String {
31        format!(
32            "iOS,{},{},{}",
33            self.settings.os_version,
34            self.settings.sendbird_sdk_version,
35            self.settings.sendbird_app_id
36        )
37    }
38
39    pub(super) fn sendbird_user_agent_value(&self) -> String {
40        format!("iOS/c{}///", self.settings.sendbird_sdk_version)
41    }
42
43    pub(super) fn sendbird_sdk_user_agent_value(&self) -> String {
44        format!(
45            "main_sdk_info=chat/ios/{}&device_os_platform=ios&os_version={}",
46            self.settings.sendbird_sdk_version, self.settings.os_version
47        )
48    }
49
50    pub(super) fn sendbird_headers(&self) -> Result<reqwest::header::HeaderMap, HingeError> {
51        use reqwest::header::{HeaderMap, HeaderValue};
52        let mut h = HeaderMap::new();
53        h.insert("accept", HeaderValue::from_static("*/*"));
54        h.insert("connection", HeaderValue::from_static("keep-alive"));
55        h.insert(
56            "accept-language",
57            HeaderValue::from_static(SENDBIRD_ACCEPT_LANGUAGE),
58        );
59        h.insert(
60            "x-session-key",
61            HeaderValue::from_str(&self.session_id)
62                .map_err(|e| HingeError::Http(format!("Invalid session key header: {}", e)))?,
63        );
64        h.insert(
65            "x-device-id",
66            HeaderValue::from_str(&self.device_id)
67                .map_err(|e| HingeError::Http(format!("Invalid device id header: {}", e)))?,
68        );
69        h.insert(
70            "x-install-id",
71            HeaderValue::from_str(&self.install_id)
72                .map_err(|e| HingeError::Http(format!("Invalid install id header: {}", e)))?,
73        );
74        h.insert(
75            "sb-user-id",
76            HeaderValue::from_str(
77                self.hinge_auth
78                    .as_ref()
79                    .map(|token| token.identity_id.as_str())
80                    .unwrap_or_default(),
81            )
82            .map_err(|e| HingeError::Http(format!("Invalid sb-user-id header: {}", e)))?,
83        );
84        if let Some(sb_auth) = &self.sendbird_auth {
85            h.insert(
86                "sb-access-token",
87                HeaderValue::from_str(&sb_auth.token)
88                    .map_err(|e| HingeError::Http(format!("Invalid sb-access-token: {}", e)))?,
89            );
90        }
91        if let Some(session_key) = &self.sendbird_session_key {
92            h.insert(
93                "Session-Key",
94                HeaderValue::from_str(session_key)
95                    .map_err(|e| HingeError::Http(format!("Invalid session key: {}", e)))?,
96            );
97        }
98        // Timestamp header present in logs
99        let ts = chrono::Utc::now().timestamp_millis();
100        h.insert(
101            "Request-Sent-Timestamp",
102            HeaderValue::from_str(&ts.to_string())
103                .map_err(|e| HingeError::Http(format!("Invalid timestamp: {}", e)))?,
104        );
105        // SDK-identifying headers observed in logs
106        h.insert(
107            "SendBird",
108            HeaderValue::from_str(&self.sendbird_header_value())
109                .map_err(|e| HingeError::Http(format!("Invalid SendBird header: {}", e)))?,
110        );
111        h.insert(
112            "SB-User-Agent",
113            HeaderValue::from_str(&self.sendbird_user_agent_value())
114                .map_err(|e| HingeError::Http(format!("Invalid SB-User-Agent: {}", e)))?,
115        );
116        h.insert(
117            "SB-SDK-User-Agent",
118            HeaderValue::from_str(&self.sendbird_sdk_user_agent_value())
119                .map_err(|e| HingeError::Http(format!("Invalid SB-SDK-User-Agent: {}", e)))?,
120        );
121        h.insert(
122            "user-agent",
123            HeaderValue::from_static(SENDBIRD_REST_USER_AGENT),
124        );
125        Ok(h)
126    }
127
128    async fn sendbird_get(&self, path_and_query: &str) -> Result<reqwest::Response, HingeError> {
129        let url = format!("{}/v3{}", self.settings.sendbird_api_url, path_and_query);
130        let headers = self.sendbird_headers()?;
131        log_request("GET", &url, &headers, None);
132        let res = self.http.get(url).headers(headers.clone()).send().await?;
133        log::info!("[sendbird] GET {} -> {}", path_and_query, res.status());
134        Ok(res)
135    }
136
137    async fn ensure_sendbird_session(&mut self) -> Result<(), HingeError> {
138        // If a WS is already connected, we're good
139        if self.sendbird_ws_connected {
140            return Ok(());
141        }
142
143        // Ensure we have Sendbird JWT from Hinge
144        if self.sendbird_auth.is_none() {
145            self.authenticate_with_sendbird().await?;
146        }
147
148        // Start and hold a single WS connection; capture LOGI and broadcast frames
149        let (cmd_tx, broadcast_tx) = self.start_sendbird_ws().await?;
150        self.sendbird_ws_cmd_tx = Some(cmd_tx);
151        self.sendbird_ws_broadcast_tx = Some(broadcast_tx);
152        self.sendbird_ws_connected = true;
153        Ok(())
154    }
155
156    async fn start_sendbird_ws(
157        &mut self,
158    ) -> Result<
159        (
160            tokio::sync::mpsc::UnboundedSender<String>,
161            tokio::sync::broadcast::Sender<String>,
162        ),
163        HingeError,
164    > {
165        let sb = self
166            .sendbird_auth
167            .as_ref()
168            .ok_or_else(|| HingeError::Auth("sendbird token missing".into()))?;
169        let user_id = self
170            .hinge_auth
171            .as_ref()
172            .map(|t| t.identity_id.clone())
173            .unwrap_or_default();
174        let ws_url = format!(
175            "{}/?p=iOS&sv={}&pv={}&uikit_config=0&use_local_cache=0&include_extra_data=premium_feature_list,file_upload_size_limit,emoji_hash,application_attributes,notifications,message_template,ai_agent&include_poll_details=1&user_id={}&ai={}&pmce=1&expiring_session=0&config_ts=0",
176            self.settings.sendbird_ws_url,
177            self.settings.sendbird_sdk_version,
178            self.settings.os_version,
179            user_id,
180            self.settings.sendbird_app_id
181        );
182        let ws_ts = chrono::Utc::now().timestamp_millis().to_string();
183        let host = ws_url
184            .trim_start_matches("wss://")
185            .trim_start_matches("ws://")
186            .split('/')
187            .next()
188            .unwrap_or("");
189        let ws_key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
190        let mut builder = tokio_tungstenite::tungstenite::http::Request::builder().uri(&ws_url);
191        if let Some(sk) = &self.sendbird_session_key {
192            builder = builder.header("SENDBIRD-WS-AUTH", sk);
193        } else {
194            builder = builder.header("SENDBIRD-WS-TOKEN", sb.token.clone());
195        }
196        builder = builder
197            .header("Accept", "*/*")
198            .header("Accept-Encoding", "gzip, deflate")
199            .header("Sec-WebSocket-Extensions", "permessage-deflate")
200            .header("Sec-WebSocket-Key", &ws_key)
201            .header("Sec-WebSocket-Version", "13")
202            .header("Request-Sent-Timestamp", &ws_ts)
203            .header("Host", host)
204            .header("Origin", SENDBIRD_WS_ORIGIN)
205            .header("Accept-Language", SENDBIRD_ACCEPT_LANGUAGE)
206            .header("x-session-key", &self.session_id)
207            .header("x-device-id", &self.device_id)
208            .header("x-install-id", &self.install_id)
209            .header("sb-user-id", &user_id)
210            .header("sb-access-token", &sb.token)
211            .header("SendBird", self.sendbird_header_value())
212            .header("SB-User-Agent", self.sendbird_user_agent_value())
213            .header("SB-SDK-User-Agent", self.sendbird_sdk_user_agent_value())
214            .header("Connection", "Upgrade")
215            .header("Upgrade", "websocket")
216            .header("User-Agent", &self.hinge_user_agent());
217        // Log WS request (redacted)
218        {
219            let mut pairs: Vec<(String, String)> = Vec::new();
220            if let Some(sk) = &self.sendbird_session_key {
221                pairs.push(("SENDBIRD-WS-AUTH".into(), sk.clone()));
222            } else {
223                pairs.push(("SENDBIRD-WS-TOKEN".into(), sb.token.clone()));
224            }
225            pairs.push(("Accept".into(), "*/*".into()));
226            pairs.push(("Accept-Encoding".into(), "gzip, deflate".into()));
227            pairs.push((
228                "Sec-WebSocket-Extensions".into(),
229                "permessage-deflate".into(),
230            ));
231            pairs.push(("Accept-Language".into(), SENDBIRD_ACCEPT_LANGUAGE.into()));
232            pairs.push(("Host".into(), host.to_string()));
233            pairs.push(("Origin".into(), SENDBIRD_WS_ORIGIN.into()));
234            pairs.push(("Sec-WebSocket-Key".into(), ws_key.clone()));
235            pairs.push(("Sec-WebSocket-Version".into(), "13".into()));
236            pairs.push(("Request-Sent-Timestamp".into(), ws_ts.clone()));
237            pairs.push(("x-session-key".into(), self.session_id.clone()));
238            pairs.push(("x-device-id".into(), self.device_id.clone()));
239            pairs.push(("x-install-id".into(), self.install_id.clone()));
240            pairs.push(("sb-user-id".into(), user_id.clone()));
241            pairs.push(("sb-access-token".into(), sb.token.clone()));
242            pairs.push(("SendBird".into(), self.sendbird_header_value()));
243            pairs.push(("SB-User-Agent".into(), self.sendbird_user_agent_value()));
244            pairs.push((
245                "SB-SDK-User-Agent".into(),
246                self.sendbird_sdk_user_agent_value(),
247            ));
248            pairs.push(("Connection".into(), "Upgrade".into()));
249            pairs.push(("Upgrade".into(), "websocket".into()));
250            pairs.push(("User-Agent".into(), self.hinge_user_agent()));
251            log::info!("━━━━━━━━━━ WS REQUEST ━━━━━━━━━━");
252            log::info!("GET {}", ws_url);
253            log::debug!("Headers:\n{}", crate::logging::format_ws_headers(&pairs));
254            log::info!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
255        }
256
257        let req: tokio_tungstenite::tungstenite::http::Request<()> = builder
258            .body(())
259            .map_err(|e| HingeError::Http(e.to_string()))?;
260        let (ws, _resp) = tokio_tungstenite::connect_async(req)
261            .await
262            .map_err(|e| HingeError::Http(e.to_string()))?;
263        let (write_half, mut read_half) = ws.split();
264        let write_half = std::sync::Arc::new(tokio::sync::Mutex::new(write_half));
265
266        let (tx_cmd, mut rx_cmd) = tokio::sync::mpsc::unbounded_channel::<String>();
267        let (tx_broadcast, _rx_broadcast) = tokio::sync::broadcast::channel::<String>(1024);
268        let (sk_tx, sk_rx) = tokio::sync::oneshot::channel::<String>();
269
270        // Reader: capture LOGI to set Session-Key; forward frames; respond to Ping
271        {
272            let write_for_pong = write_half.clone();
273            let tx_broadcast_c = tx_broadcast.clone();
274            let pending_requests = self.sendbird_ws_pending_requests.clone();
275            tokio::spawn(async move {
276                let mut sk_tx_opt = Some(sk_tx);
277                while let Some(msg) = read_half.next().await {
278                    match msg {
279                        Ok(Message::Ping(_)) => {
280                            let mut w = write_for_pong.lock().await;
281                            let _ = w.send(Message::Pong(Vec::new().into())).await;
282                        }
283                        Ok(Message::Text(t)) => {
284                            let t = t.to_string();
285                            // Handle LOGI frame - extract session key
286                            if t.starts_with("LOGI")
287                                && let Some(start) = t.find('{')
288                                && let Ok(val) =
289                                    serde_json::from_str::<serde_json::Value>(&t[start..])
290                            {
291                                if let Some(k) = crate::ws::sendbird_logi_session_key(&val) {
292                                    let _ = tx_broadcast_c.send(format!("__SESSION_KEY__:{}", k));
293                                    if let Some(tx) = sk_tx_opt.take() {
294                                        let _ = tx.send(k.to_string());
295                                    }
296                                }
297                                // Log important LOGI fields
298                                log::info!(
299                                    "[sendbird ws] LOGI received - user_id: {}, ping_interval: {}, pong_timeout: {}",
300                                    val.get("user_id")
301                                        .and_then(|v| v.as_str())
302                                        .unwrap_or("unknown"),
303                                    val.get("ping_interval")
304                                        .and_then(|v| v.as_i64())
305                                        .unwrap_or(0),
306                                    val.get("pong_timeout")
307                                        .and_then(|v| v.as_i64())
308                                        .unwrap_or(0)
309                                );
310                            }
311                            // Handle PING frame - respond with PONG
312                            else if t.starts_with("PING") {
313                                log::debug!("[sendbird ws] Received PING, sending PONG");
314                                if let Some(start) = t.find('{')
315                                    && let Ok(_val) =
316                                        serde_json::from_str::<serde_json::Value>(&t[start..])
317                                {
318                                    let pong_response = json!({
319                                        "sts": chrono::Utc::now().timestamp_millis(),
320                                        "ts": chrono::Utc::now().timestamp_millis()
321                                    });
322                                    let pong_msg = format!("PONG{}", pong_response);
323                                    let mut w = write_for_pong.lock().await;
324                                    let _ = w.send(Message::Text(pong_msg.into())).await;
325                                }
326                            }
327                            // Handle READ acknowledgments
328                            else if t.starts_with("READ") && t.contains("channel_id") {
329                                log::debug!("[sendbird ws] Received READ acknowledgment");
330                                // Check if this is a response to a pending request
331                                if let Some(start) = t.find('{')
332                                    && let Ok(val) =
333                                        serde_json::from_str::<serde_json::Value>(&t[start..])
334                                    && let Some(req_id) = val.get("req_id").and_then(|v| v.as_str())
335                                {
336                                    let mut pending = pending_requests.lock().await;
337                                    if let Some(tx) = pending.remove(req_id) {
338                                        let _ = tx.send(val.clone());
339                                        log::debug!(
340                                            "[sendbird ws] Matched READ response for req_id: {}",
341                                            req_id
342                                        );
343                                    }
344                                }
345                            }
346                            // Broadcast all messages to subscribers
347                            // Parse SYEV (system event) frames for structured typing events
348                            if t.starts_with("SYEV")
349                                && let Some(start) = t.find('{')
350                                && let Ok(val) =
351                                    serde_json::from_str::<serde_json::Value>(&t[start..])
352                            {
353                                // Broadcast raw
354                                let _ = tx_broadcast_c.send(t.clone());
355                                // Try to parse into structured model
356                                if let Ok(evt) = serde_json::from_value::<
357                                    crate::models::SendbirdSyevEvent,
358                                >(val.clone())
359                                {
360                                    // Typing start/end logging
361                                    if evt.cat
362                                        == crate::models::SendbirdSyevEvent::CATEGORY_TYPING_START
363                                    {
364                                        log::debug!(
365                                            "[sendbird ws] SYEV typing start user={} channel={}",
366                                            evt.data
367                                                .as_ref()
368                                                .map(|u| u.user_id.as_str())
369                                                .unwrap_or("unknown"),
370                                            evt.channel_url
371                                        );
372                                    } else if evt.cat
373                                        == crate::models::SendbirdSyevEvent::CATEGORY_TYPING_END
374                                    {
375                                        log::debug!(
376                                            "[sendbird ws] SYEV typing end user={} channel={}",
377                                            evt.data
378                                                .as_ref()
379                                                .map(|u| u.user_id.as_str())
380                                                .unwrap_or("unknown"),
381                                            evt.channel_url
382                                        );
383                                    }
384                                    // Broadcast structured event for consumers
385                                    if let Ok(json_evt) = serde_json::to_string(&evt) {
386                                        let _ =
387                                            tx_broadcast_c.send(format!("__SYEV__:{}", json_evt));
388                                    }
389                                    continue;
390                                }
391                            }
392                            let _ = tx_broadcast_c.send(t);
393                        }
394                        Ok(Message::Binary(b)) => {
395                            let _ = tx_broadcast_c.send(String::from_utf8_lossy(&b).into_owned());
396                        }
397                        Ok(Message::Pong(_)) => {}
398                        Ok(Message::Close(frame)) => {
399                            if let Some(cf) = frame {
400                                let code_u16: u16 = cf.code.into();
401
402                                // Analyze if this could be time-based
403                                // Your observation: LOGI at 8:03:11.826, Close at 8:03:12.526, code 55409
404                                // Theory: The code might be derived from timestamp
405                                let now = chrono::Utc::now();
406                                let ms_timestamp = now.timestamp_millis();
407
408                                // Various time-based calculations that might match
409                                let last_5_of_ms = (ms_timestamp % 100000) as u16;
410                                let last_5_of_seconds = ((ms_timestamp / 1000) % 100000) as u16;
411                                let seconds_today = (now.timestamp() % 86400) as u16;
412                                let ms_today = ((now.timestamp() % 86400) * 1000
413                                    + now.timestamp_subsec_millis() as i64)
414                                    as u32;
415                                let ms_today_mod = (ms_today % 65536) as u16; // Fit in u16
416
417                                log::debug!(
418                                    "[sendbird ws] Time analysis - code: {}, last5_ms: {}, last5_sec: {}, sec_today: {}, ms_today_mod: {}",
419                                    code_u16,
420                                    last_5_of_ms,
421                                    last_5_of_seconds,
422                                    seconds_today,
423                                    ms_today_mod
424                                );
425
426                                let code_desc = match code_u16 {
427                                    // Standard WebSocket close codes (1000-4999)
428                                    1000 => "Normal closure",
429                                    1001 => "Going away",
430                                    1002 => "Protocol error",
431                                    1003 => "Unsupported data",
432                                    1006 => "Abnormal closure",
433                                    1008 => "Policy violation",
434                                    1009 => "Message too big",
435                                    1010 => "Mandatory extension",
436                                    1011 => "Internal server error",
437                                    1015 => "TLS handshake failure",
438                                    // Sendbird appears to use dynamic codes possibly derived from timestamps
439                                    _ if code_u16 >= 10000 => {
440                                        "Sendbird dynamic code (possibly time-derived)"
441                                    }
442                                    _ => "Non-standard close code",
443                                };
444                                log::warn!(
445                                    "[sendbird ws] Connection closed - code: {} ({}), reason: {}",
446                                    code_u16,
447                                    code_desc,
448                                    cf.reason
449                                );
450                                let _ = tx_broadcast_c
451                                    .send(format!("__CLOSE__:{}:{}", code_u16, cf.reason));
452
453                                // Since Sendbird uses dynamic codes, we can't determine reconnection strategy from the code
454                                // The reason string might be more informative than the code itself
455                                if !cf.reason.is_empty() {
456                                    log::info!(
457                                        "[sendbird ws] Close reason provided: {}",
458                                        cf.reason
459                                    );
460                                }
461                            } else {
462                                log::warn!("[sendbird ws] Connection closed without frame");
463                                let _ = tx_broadcast_c.send("__CLOSE__".into());
464                            }
465                            break;
466                        }
467                        Ok(_) => {}
468                        Err(e) => {
469                            log::error!("[sendbird ws] WebSocket error: {}", e);
470                            let _ = tx_broadcast_c.send(format!("__ERROR__:{}", e));
471                            break;
472                        }
473                    }
474                }
475            });
476        }
477
478        // Writer: forward commands to WS
479        {
480            let write_for_cmds = write_half.clone();
481            tokio::spawn(async move {
482                while let Some(cmd) = rx_cmd.recv().await {
483                    let mut w = write_for_cmds.lock().await;
484
485                    // Check if this is a special close command
486                    if cmd.starts_with("__CLOSE__:") {
487                        // Parse the close code and reason
488                        let parts: Vec<&str> = cmd
489                            .strip_prefix("__CLOSE__:")
490                            .unwrap_or("")
491                            .split(':')
492                            .collect();
493                        let code = parts
494                            .first()
495                            .and_then(|s| s.parse::<u16>().ok())
496                            .unwrap_or(1000);
497                        let reason = parts.get(1).unwrap_or(&"").to_string();
498
499                        // Send WebSocket Close frame
500                        let close_frame = tokio_tungstenite::tungstenite::protocol::CloseFrame {
501                            code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::from(code),
502                            reason: reason.into(),
503                        };
504                        let _ = w.send(Message::Close(Some(close_frame))).await;
505                        break; // Stop the writer task after sending close
506                    } else {
507                        // Regular text command
508                        let _ = w.send(Message::Text(cmd.into())).await;
509                    }
510                }
511            });
512        }
513
514        // Wait for LOGI to deliver session key before returning so REST can use it
515        if let Ok(k) = sk_rx.await {
516            self.sendbird_session_key = Some(k);
517            log::info!("[sendbird] Session-Key captured");
518            if let Some(path) = &self.session_path {
519                let _ = self.save_session(path);
520            }
521        } else {
522            log::warn!("Sendbird LOGI not received before startup return");
523        }
524        Ok((tx_cmd, tx_broadcast))
525    }
526
527    pub async fn sendbird_list_my_group_channels(
528        &mut self,
529        user_id: &str,
530        limit: usize,
531    ) -> Result<serde_json::Value, HingeError> {
532        self.ensure_sendbird_session().await?;
533        let q = format!(
534            "/users/{}/my_group_channels?&include_left_channel=false&member_state_filter=all&super_mode=all&show_latest_message=false&show_pinned_messages=false&unread_filter=all&show_delivery_receipt=true&show_conversation=false&show_member=true&show_empty=true&limit={}&user_id={}&is_feed_channel=false&order=latest_last_message&hidden_mode=unhidden_only&distinct_mode=all&show_read_receipt=true&show_metadata=true&is_explicit_request=true&show_frozen=true&public_mode=all&include_chat_notification=false",
535            user_id, limit, user_id
536        );
537        let res = self.sendbird_get(&q).await?;
538        self.parse_response(res).await
539    }
540
541    pub async fn sendbird_list_channels_typed(
542        &mut self,
543        limit: usize,
544    ) -> Result<crate::models::SendbirdChannelsResponse, HingeError> {
545        let user_id = self
546            .hinge_auth
547            .as_ref()
548            .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
549            .identity_id
550            .clone();
551        let limit = limit.clamp(1, 200);
552        let raw = self
553            .sendbird_list_my_group_channels(&user_id, limit)
554            .await?;
555        parse_json_value_with_path(raw).map_err(|e| {
556            HingeError::Serde(format!("Failed to parse Sendbird channels response: {}", e))
557        })
558    }
559
560    pub async fn sendbird_get_channel(
561        &mut self,
562        channel_url: &str,
563    ) -> Result<serde_json::Value, HingeError> {
564        self.ensure_sendbird_session().await?;
565        let q = format!(
566            "/sdk/group_channels/{}?&is_feed_channel=false&show_latest_message=false&show_metadata=false&show_empty=false&show_member=true&show_frozen=false&show_read_receipt=true&show_pinned_messages=false&include_chat_notification=false&show_delivery_receipt=true&show_conversation=true",
567            channel_url
568        );
569        let res = self.sendbird_get(&q).await?;
570        self.parse_response(res).await
571    }
572
573    pub async fn sendbird_get_channel_typed(
574        &mut self,
575        channel_url: &str,
576    ) -> Result<SendbirdGroupChannel, HingeError> {
577        let value = self.sendbird_get_channel(channel_url).await?;
578        parse_json_value_with_path(value)
579            .map_err(|e| HingeError::Serde(format!("Failed to parse channel: {}", e)))
580    }
581
582    pub async fn sendbird_get_messages(
583        &mut self,
584        channel_url: &str,
585        message_ts: i64,
586        prev_limit: usize,
587    ) -> Result<crate::models::SendbirdMessagesResponse, HingeError> {
588        self.ensure_sendbird_session().await?;
589        let q = format!(
590            "/group_channels/{}/messages?&include_reply_type=all&sdk_source=external_legacy&with_sorted_meta_array=true&message_ts={}&is_sdk=true&include_reactions_summary=true&include_parent_message_info=false&reverse=true&prev_limit={}&custom_types=%2A&include=false&next_limit=0&include_poll_details=true&show_subchannel_messages_only=false&include_thread_info=false",
591            channel_url, message_ts, prev_limit
592        );
593        let res = self.sendbird_get(&q).await?;
594        self.parse_response(res).await
595    }
596
597    pub async fn sendbird_get_full_messages(
598        &mut self,
599        channel_url: &str,
600    ) -> Result<Vec<SendbirdMessage>, HingeError> {
601        self.ensure_sendbird_session().await?;
602        const PAGE_SIZE: usize = 120;
603        let mut anchor = chrono::Utc::now().timestamp_millis();
604        let mut seen: HashSet<String> = HashSet::new();
605        let mut collected: Vec<(i64, SendbirdMessage)> = Vec::new();
606
607        loop {
608            let batch = self
609                .sendbird_get_messages(channel_url, anchor, PAGE_SIZE)
610                .await?;
611            if batch.messages.is_empty() {
612                break;
613            }
614            let mut earliest = anchor;
615            let mut added = 0usize;
616            for message in batch.messages {
617                if seen.insert(message.message_id.clone()) {
618                    let ts = parse_ts(&message.created_at).unwrap_or(anchor);
619                    earliest = min(earliest, ts.saturating_sub(1));
620                    collected.push((ts, message));
621                    added += 1;
622                }
623            }
624            if added == 0 {
625                break;
626            }
627            if earliest >= anchor || earliest <= 0 {
628                break;
629            }
630            anchor = earliest;
631            if collected.len() >= 4000 {
632                log::warn!(
633                    "[sendbird] Stopping history fetch after {} messages to avoid huge exports",
634                    collected.len()
635                );
636                break;
637            }
638        }
639
640        collected.sort_by_key(|(ts, _)| *ts);
641        Ok(collected.into_iter().map(|(_, msg)| msg).collect())
642    }
643
644    pub async fn export_chat(
645        &mut self,
646        input: ExportChatInput,
647    ) -> Result<ExportChatResult, HingeError> {
648        self.ensure_sendbird_session().await?;
649        let auth = self
650            .hinge_auth
651            .as_ref()
652            .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
653            .clone();
654        let self_user_id = auth.identity_id.clone();
655
656        let prompts_manager = match self.fetch_prompts_manager().await {
657            Ok(mgr) => Some(mgr),
658            Err(err) => {
659                log::warn!("Failed to prefetch prompts for export: {}", err);
660                None
661            }
662        };
663
664        let channel = self.sendbird_get_channel_typed(&input.channel_url).await?;
665        let partner = channel
666            .members
667            .iter()
668            .find(|member| !member.user_id.is_empty() && member.user_id != self_user_id)
669            .cloned()
670            .ok_or_else(|| HingeError::Http("unable to determine conversation partner".into()))?;
671
672        let peer_id = partner.user_id.clone();
673        let profile = self
674            .get_profiles(vec![peer_id.clone()])
675            .await?
676            .into_iter()
677            .next();
678        let profile_content = self
679            .get_profile_content(vec![peer_id.clone()])
680            .await?
681            .into_iter()
682            .next();
683
684        let display_name = profile
685            .as_ref()
686            .map(|p| p.profile.first_name.clone())
687            .filter(|name| !name.trim().is_empty())
688            .or_else(|| {
689                if !partner.nickname.trim().is_empty() {
690                    Some(partner.nickname.clone())
691                } else {
692                    None
693                }
694            })
695            .unwrap_or_else(|| peer_id.clone());
696
697        let age_label = profile
698            .as_ref()
699            .and_then(|p| p.profile.age)
700            .map(|age| age.to_string())
701            .unwrap_or_else(|| "Unknown age".to_string());
702
703        let initiation_summary_lines = if let Some(lines) = input.initiation_summary_lines.clone() {
704            if lines.is_empty() { None } else { Some(lines) }
705        } else {
706            match self.get_connections_v2().await {
707                Ok(resp) => resp
708                    .connections
709                    .into_iter()
710                    .find(|conn| {
711                        let initiator = conn.initiator_id.trim();
712                        let subject = conn.subject_id.trim();
713                        (!initiator.is_empty() && initiator == self_user_id && subject == peer_id)
714                            || (!subject.is_empty()
715                                && subject == self_user_id
716                                && initiator == peer_id)
717                    })
718                    .and_then(|conn| {
719                        summarize_connection_initiation(
720                            &conn,
721                            &self_user_id,
722                            &peer_id,
723                            &display_name,
724                        )
725                    }),
726                Err(err) => {
727                    log::warn!(
728                        "Failed to fetch connections for initiation summary: {}",
729                        err
730                    );
731                    None
732                }
733            }
734        };
735
736        let base_dir = Path::new(&input.output_dir);
737        let export_dir = base_dir.to_path_buf();
738        fs::create_dir_all(&export_dir).map_err(|e| HingeError::Storage(e.to_string()))?;
739
740        let messages = self.sendbird_get_full_messages(&input.channel_url).await?;
741
742        let mut transcript = String::new();
743        writeln!(transcript, "Chat with {} ({})", display_name, age_label).ok();
744        writeln!(transcript, "Channel: {}", input.channel_url).ok();
745        writeln!(transcript, "Exported at {}", Utc::now().to_rfc3339()).ok();
746        if let Some(lines) = &initiation_summary_lines {
747            for line in lines {
748                writeln!(transcript, "{line}").ok();
749            }
750        }
751        transcript.push('\n');
752
753        let mut media_files: Vec<ExportedMediaFile> = Vec::new();
754
755        if input.include_media
756            && let Some(ref content) = profile_content
757        {
758            for (idx, photo) in content.content.photos.iter().enumerate() {
759                let mut file_name = format!("profile_photo_{}", idx + 1);
760                if let Some(ext) = photo
761                    .url
762                    .split('.')
763                    .next_back()
764                    .filter(|part| part.len() <= 5)
765                {
766                    file_name.push('.');
767                    file_name.push_str(ext);
768                }
769                let sanitized = sanitize_component(&file_name);
770                let target_path = export_dir.join(&sanitized);
771                let bytes = self.http_get_bytes(&photo.url).await?;
772                fs::write(&target_path, &bytes).map_err(|e| HingeError::Storage(e.to_string()))?;
773                media_files.push(ExportedMediaFile {
774                    message_id: format!("profile_photo_{}", idx + 1),
775                    file_name: sanitized.clone(),
776                    file_path: target_path.to_string_lossy().to_string(),
777                });
778            }
779        }
780
781        for message in &messages {
782            let timestamp = parse_ts(&message.created_at).unwrap_or(0);
783            let local_time: DateTime<Local> = DateTime::<Utc>::from_timestamp_millis(timestamp)
784                .map(|dt| dt.with_timezone(&Local))
785                .unwrap_or_else(Local::now);
786            let sender = if message.user.user_id == self_user_id {
787                "You".to_string()
788            } else if !message.user.nickname.is_empty() {
789                message.user.nickname.clone()
790            } else {
791                display_name.clone()
792            };
793            let body = if !message.message.trim().is_empty() {
794                message.message.clone()
795            } else if !message.data.trim().is_empty() {
796                message.data.clone()
797            } else if !message.custom_type.trim().is_empty() {
798                format!("[{} message]", message.custom_type)
799            } else {
800                "[non-text message]".into()
801            };
802
803            writeln!(
804                transcript,
805                "{} - {}: {}",
806                local_time.format("%Y-%m-%d %H:%M:%S"),
807                sender,
808                body
809            )
810            .ok();
811
812            if input.include_media
813                && let Some((url, name)) = attachment_from_value(&message.file)
814            {
815                let sanitized = sanitize_component(&name);
816                let target_path = export_dir.join(&sanitized);
817                let bytes = self.http_get_bytes(&url).await?;
818                fs::write(&target_path, &bytes).map_err(|e| HingeError::Storage(e.to_string()))?;
819                writeln!(transcript, "    [Saved attachment: {}]", sanitized).ok();
820                media_files.push(ExportedMediaFile {
821                    message_id: message.message_id.clone(),
822                    file_name: sanitized.clone(),
823                    file_path: target_path.to_string_lossy().to_string(),
824                });
825            }
826        }
827
828        let transcript_path = export_dir.join("chat.txt");
829        fs::write(&transcript_path, transcript).map_err(|e| HingeError::Storage(e.to_string()))?;
830
831        let profile_text = render_profile(
832            profile.as_ref(),
833            profile_content.as_ref(),
834            prompts_manager.as_ref(),
835        );
836        let profile_path = if !profile_text.trim().is_empty() {
837            let path = export_dir.join("profile.txt");
838            fs::write(&path, profile_text).map_err(|e| HingeError::Storage(e.to_string()))?;
839            Some(path)
840        } else {
841            None
842        };
843
844        Ok(ExportChatResult {
845            folder_path: export_dir.to_string_lossy().to_string(),
846            transcript_path: transcript_path.to_string_lossy().to_string(),
847            profile_path: profile_path.map(|p| p.to_string_lossy().to_string()),
848            message_count: messages.len().min(i32::MAX as usize) as i32,
849            media_files,
850        })
851    }
852
853    pub async fn sendbird_create_distinct_dm(
854        &mut self,
855        self_user_id: &str,
856        peer_user_id: &str,
857        data_mm: i32,
858    ) -> Result<serde_json::Value, HingeError> {
859        self.ensure_sendbird_session().await?;
860        let payload = json!({
861            "is_ephemeral": false,
862            "is_exclusive": false,
863            "data": format!("{{\n  \"mm\" : {}\n}}", data_mm),
864            "user_ids": [peer_user_id, self_user_id],
865            "is_super": false,
866            "is_distinct": true,
867            "strict": false,
868            "is_broadcast": false,
869            "message_survival_seconds": -1,
870            "is_public": false
871        });
872        let url = format!(
873            "{}/v3{}",
874            self.settings.sendbird_api_url, "/group_channels?"
875        );
876        let mut headers = self.sendbird_headers()?;
877        use reqwest::header::HeaderValue;
878        headers.insert(
879            "content-type",
880            HeaderValue::from_static("application/x-www-form-urlencoded"),
881        );
882        log_request("POST", &url, &headers, Some(&payload));
883        let res = self
884            .http
885            .post(url)
886            .headers(headers)
887            .body(serde_json::to_string(&payload).unwrap_or_default())
888            .send()
889            .await?;
890        log::info!("[sendbird] POST /group_channels -> {}", res.status());
891        self.parse_response(res).await
892    }
893
894    pub async fn sendbird_get_or_create_dm_channel(
895        &mut self,
896        self_user_id: &str,
897        peer_user_id: &str,
898    ) -> Result<String, HingeError> {
899        // Try find existing channel containing exactly the two members
900        let q = format!(
901            "/users/{}/my_group_channels?&members_exactly_in={}&show_latest_message=false&distinct_mode=all&hidden_mode=unhidden_only&show_pinned_messages=false&show_metadata=true&member_state_filter=all&user_id={}&is_explicit_request=true&public_mode=all&include_left_channel=false&show_conversation=false&show_frozen=true&is_feed_channel=false&show_delivery_receipt=true&unread_filter=all&super_mode=all&show_member=true&show_read_receipt=true&order=chronological&show_empty=true&include_chat_notification=false&limit=1",
902            self_user_id, peer_user_id, self_user_id
903        );
904        self.ensure_sendbird_session().await?;
905        let res = self.sendbird_get(&q).await?;
906        let v: serde_json::Value = self.parse_response(res).await?;
907        if let Some(url) = v
908            .get("channels")
909            .and_then(|c| c.as_array())
910            .and_then(|arr| arr.first())
911            .and_then(|c| c.get("channel_url"))
912            .and_then(|s| s.as_str())
913        {
914            return Ok(url.to_string());
915        }
916        let created = self
917            .sendbird_create_distinct_dm(self_user_id, peer_user_id, 1)
918            .await?;
919        let url = created
920            .get("channel_url")
921            .and_then(|s| s.as_str())
922            .ok_or_else(|| HingeError::Http("missing channel_url in create response".into()))?;
923        Ok(url.to_string())
924    }
925
926    pub async fn ensure_sendbird_channel_with(
927        &mut self,
928        peer_user_id: &str,
929    ) -> Result<SendbirdChannelHandle, HingeError> {
930        let self_user_id = self
931            .hinge_auth
932            .as_ref()
933            .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
934            .identity_id
935            .clone();
936        let channel_url = self
937            .sendbird_get_or_create_dm_channel(&self_user_id, peer_user_id)
938            .await?;
939        Ok(SendbirdChannelHandle { channel_url })
940    }
941
942    pub async fn sendbird_init_flow(&mut self) -> Result<serde_json::Value, HingeError> {
943        // Health probe, user update is done by Hinge; we just list channels for the current user
944        self.ensure_sendbird_session().await?;
945        let user_id = self
946            .hinge_auth
947            .as_ref()
948            .ok_or_else(|| HingeError::Auth("hinge token missing".into()))?
949            .identity_id
950            .clone();
951        let res = self.sendbird_list_my_group_channels(&user_id, 20).await?;
952        Ok(res)
953    }
954
955    /// Return Sendbird credentials for the JS client (appId and token), ensuring auth
956    pub async fn sendbird_creds(&mut self) -> Result<serde_json::Value, HingeError> {
957        // Ensure we have Sendbird JWT from Hinge but do not start WS
958        if self.sendbird_auth.is_none() {
959            self.authenticate_with_sendbird().await?;
960        }
961        let app_id = self.settings.sendbird_app_id.clone();
962        let token = self
963            .sendbird_auth
964            .as_ref()
965            .map(|t| t.token.clone())
966            .unwrap_or_default();
967        Ok(serde_json::json!({
968            "appId": app_id,
969            "token": token
970        }))
971    }
972
973    // Open Sendbird WS and yield frames to a channel; also auto-respond to pings and allow READ commands
974    pub async fn sendbird_ws_subscribe(
975        &mut self,
976    ) -> Result<
977        (
978            tokio::sync::mpsc::UnboundedSender<String>,
979            tokio::sync::broadcast::Receiver<String>,
980        ),
981        HingeError,
982    > {
983        self.ensure_sendbird_session().await?;
984        let cmd = self
985            .sendbird_ws_cmd_tx
986            .as_ref()
987            .cloned()
988            .ok_or_else(|| HingeError::Http("sendbird ws not started".into()))?;
989        let tx = self
990            .sendbird_ws_broadcast_tx
991            .as_ref()
992            .cloned()
993            .ok_or_else(|| HingeError::Http("sendbird ws broadcast not available".into()))?;
994        let rx = tx.subscribe();
995        Ok((cmd, rx))
996    }
997
998    /// Send a raw command to the Sendbird WebSocket
999    pub async fn sendbird_ws_send_command(&mut self, command: String) -> Result<(), HingeError> {
1000        self.ensure_sendbird_session().await?;
1001        let tx = self
1002            .sendbird_ws_cmd_tx
1003            .as_ref()
1004            .cloned()
1005            .ok_or_else(|| HingeError::Http("sendbird ws not started".into()))?;
1006        tx.send(command)
1007            .map_err(|e| HingeError::Http(format!("Failed to send WS command: {}", e)))?;
1008        Ok(())
1009    }
1010
1011    /// Send a READ acknowledgment for a Sendbird channel (fire and forget)
1012    pub async fn sendbird_ws_send_read(&mut self, channel_url: &str) -> Result<(), HingeError> {
1013        let req_id = Uuid::new_v4().to_string().to_uppercase();
1014        let read_command = format!(
1015            r#"READ{{"req_id":"{}","channel_url":"{}"}}"#,
1016            req_id, channel_url
1017        );
1018        self.sendbird_ws_send_command(read_command).await
1019    }
1020
1021    /// Send a READ acknowledgment and wait for the response
1022    pub async fn sendbird_ws_send_read_and_wait(
1023        &mut self,
1024        channel_url: &str,
1025    ) -> Result<crate::models::SendbirdReadResponse, HingeError> {
1026        self.ensure_sendbird_session().await?;
1027
1028        // Generate request ID
1029        let req_id = Uuid::new_v4().to_string().to_uppercase();
1030
1031        // Create oneshot channel for response
1032        let (tx, rx) = tokio::sync::oneshot::channel();
1033
1034        // Register the pending request
1035        {
1036            let mut pending = self.sendbird_ws_pending_requests.lock().await;
1037            pending.insert(req_id.clone(), tx);
1038        }
1039
1040        // Send the READ command
1041        let read_command = format!(
1042            r#"READ{{"req_id":"{}","channel_url":"{}"}}"#,
1043            req_id, channel_url
1044        );
1045        self.sendbird_ws_send_command(read_command).await?;
1046
1047        // Wait for response with timeout
1048        match tokio::time::timeout(Duration::from_secs(5), rx).await {
1049            Ok(Ok(response)) => {
1050                // Parse the JSON response into our typed model
1051                parse_json_value_with_path(response)
1052                    .map_err(|e| HingeError::Http(format!("Failed to parse READ response: {}", e)))
1053            }
1054            Ok(Err(_)) => {
1055                // Channel was dropped, clean up
1056                let mut pending = self.sendbird_ws_pending_requests.lock().await;
1057                pending.remove(&req_id);
1058                Err(HingeError::Http("READ response channel dropped".into()))
1059            }
1060            Err(_) => {
1061                // Timeout, clean up
1062                let mut pending = self.sendbird_ws_pending_requests.lock().await;
1063                pending.remove(&req_id);
1064                Err(HingeError::Http("READ response timeout".into()))
1065            }
1066        }
1067    }
1068
1069    /// Send a PING to keep the WebSocket alive
1070    pub async fn sendbird_ws_send_ping(&mut self) -> Result<(), HingeError> {
1071        let req_id = Uuid::new_v4().to_string().to_uppercase();
1072        let ping_command = format!(r#"PING{{"req_id":"{}"}}"#, req_id);
1073        self.sendbird_ws_send_command(ping_command).await
1074    }
1075
1076    /// Send a TPST (Typing Start) command - fire and forget
1077    pub async fn sendbird_ws_send_typing_start(
1078        &mut self,
1079        channel_url: &str,
1080    ) -> Result<(), HingeError> {
1081        let timestamp = chrono::Utc::now().timestamp_millis();
1082        let tpst_command = format!(
1083            r#"TPST{{"req_id":null,"channel_url":"{}","time":{}}}"#,
1084            channel_url, timestamp
1085        );
1086        self.sendbird_ws_send_command(tpst_command).await
1087    }
1088
1089    /// Send a TPEN (Typing End) command - fire and forget
1090    pub async fn sendbird_ws_send_typing_end(
1091        &mut self,
1092        channel_url: &str,
1093    ) -> Result<(), HingeError> {
1094        let timestamp = chrono::Utc::now().timestamp_millis();
1095        let tpen_command = format!(
1096            r#"TPEN{{"req_id":null,"channel_url":"{}","time":{}}}"#,
1097            channel_url, timestamp
1098        );
1099        self.sendbird_ws_send_command(tpen_command).await
1100    }
1101
1102    /// Send an ENTR (Enter Channel) command - fire and forget
1103    pub async fn sendbird_ws_send_enter_channel(
1104        &mut self,
1105        channel_url: &str,
1106    ) -> Result<(), HingeError> {
1107        let entr_command = format!(r#"ENTR{{"req_id":null,"channel_url":"{}"}}"#, channel_url);
1108        self.sendbird_ws_send_command(entr_command).await
1109    }
1110
1111    /// Send an EXIT (Exit Channel) command - fire and forget
1112    pub async fn sendbird_ws_send_exit_channel(
1113        &mut self,
1114        channel_url: &str,
1115    ) -> Result<(), HingeError> {
1116        let exit_command = format!(r#"EXIT{{"req_id":null,"channel_url":"{}"}}"#, channel_url);
1117        self.sendbird_ws_send_command(exit_command).await
1118    }
1119
1120    /// Send a MACK (Message Acknowledgment) command - fire and forget
1121    pub async fn sendbird_ws_send_message_ack(
1122        &mut self,
1123        channel_url: &str,
1124        message_id: &str,
1125    ) -> Result<(), HingeError> {
1126        let mack_command = format!(
1127            r#"MACK{{"req_id":null,"channel_url":"{}","msg_id":"{}"}}"#,
1128            channel_url, message_id
1129        );
1130        self.sendbird_ws_send_command(mack_command).await
1131    }
1132
1133    /// Close the WebSocket connection with a specific code
1134    pub async fn sendbird_ws_close(
1135        &mut self,
1136        code: Option<u16>,
1137        reason: Option<String>,
1138    ) -> Result<(), HingeError> {
1139        // Send close frame if we have a command channel
1140        if let Some(ref tx) = self.sendbird_ws_cmd_tx {
1141            // Sendbird uses custom close codes like 40909
1142            let close_code = code.unwrap_or(1000); // 1000 = Normal Closure
1143            let close_reason = reason.unwrap_or_else(|| "Client initiated close".to_string());
1144
1145            // Send a close command through the channel
1146            // The writer task will handle converting this to a proper WebSocket Close frame
1147            let close_command = format!("__CLOSE__:{}:{}", close_code, close_reason);
1148            let _ = tx.send(close_command);
1149
1150            log::info!(
1151                "[sendbird ws] Closing connection with code {} reason: {}",
1152                close_code,
1153                close_reason
1154            );
1155        }
1156
1157        // Clear our state
1158        self.sendbird_ws_cmd_tx = None;
1159        self.sendbird_ws_broadcast_tx = None;
1160        self.sendbird_ws_connected = false;
1161
1162        // Clear any pending requests
1163        let mut pending = self.sendbird_ws_pending_requests.lock().await;
1164        pending.clear();
1165
1166        Ok(())
1167    }
1168
1169    /// Check if WebSocket is connected and reconnect if needed
1170    pub async fn sendbird_ws_ensure_connected(&mut self) -> Result<bool, HingeError> {
1171        // Check if we have an active WebSocket connection
1172        if self.sendbird_ws_cmd_tx.is_some() {
1173            // Try to send a ping to verify connection is alive
1174            if self.sendbird_ws_send_ping().await.is_ok() {
1175                return Ok(true);
1176            }
1177        }
1178
1179        // Connection is not active, clear the old state
1180        self.sendbird_ws_cmd_tx = None;
1181        self.sendbird_ws_broadcast_tx = None;
1182
1183        // Try to reconnect
1184        log::info!("[sendbird ws] Reconnecting WebSocket...");
1185        self.start_sendbird_ws().await?;
1186        Ok(true)
1187    }
1188}