use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, Result};
use base64::Engine;
use crate::guardian_client::GuardianClientConfig;
use futures::{SinkExt, StreamExt};
use log::{debug, error, info, warn};
use serde_json::{json, Value};
use tokio::sync::mpsc;
use tokio_tungstenite::{connect_async, tungstenite::Message};
const WS_ENDPOINT: &str = "wss://ws-api.idun.cloud";
const PLATFORM: &str = "SDK_RUST";
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[derive(Debug, Clone)]
pub struct CloudDecodedEeg {
pub action: String,
pub data: Value,
pub sequence: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
enum SessionState {
Disconnected,
Connected,
RecordingStarted {
recording_id: String,
},
RecordingOngoing {
recording_id: String,
},
Ended,
}
pub struct CloudDecoder {
api_token: String,
device_id: String,
state: SessionState,
ws_tx: Option<mpsc::Sender<String>>,
decoded_rx: Option<mpsc::Receiver<CloudDecodedEeg>>,
sequence: u64,
subscribed: bool,
}
impl CloudDecoder {
pub fn new(api_token: String, device_id: String) -> Self {
Self {
api_token,
device_id,
state: SessionState::Disconnected,
ws_tx: None,
decoded_rx: None,
sequence: 0,
subscribed: false,
}
}
pub fn from_env(device_id: String) -> Result<Self> {
let token = std::env::var("IDUN_API_TOKEN")
.map_err(|_| anyhow!("IDUN_API_TOKEN environment variable not set. \
Set it with: export IDUN_API_TOKEN=your-token"))?;
Ok(Self::new(token, device_id))
}
pub fn from_config(config: &GuardianClientConfig, device_id: String) -> Result<Self> {
match &config.api_token {
Some(token) => Ok(Self::new(token.clone(), device_id)),
None => Self::from_env(device_id),
}
}
pub async fn connect(&mut self) -> Result<()> {
let ws_url = format!("{}?authorization={}", WS_ENDPOINT, self.api_token);
info!("[CLOUD] Connecting to IDUN Cloud at {WS_ENDPOINT}…");
let (ws_stream, _response) = connect_async(&ws_url).await
.map_err(|e| anyhow!("Failed to connect to IDUN Cloud: {e}"))?;
info!("[CLOUD] WebSocket connected");
let (mut ws_write, mut ws_read) = ws_stream.split();
let (out_tx, mut out_rx) = mpsc::channel::<String>(256);
let (decoded_tx, decoded_rx) = mpsc::channel::<CloudDecodedEeg>(256);
self.ws_tx = Some(out_tx.clone());
self.decoded_rx = Some(decoded_rx);
self.state = SessionState::Connected;
tokio::spawn(async move {
while let Some(msg) = out_rx.recv().await {
if let Err(e) = ws_write.send(Message::Text(msg.into())).await {
error!("[CLOUD] WebSocket write error: {e}");
break;
}
}
debug!("[CLOUD] Writer task ended");
});
let state_tx = mpsc::channel::<(String, String)>(16);
let mut state_rx = state_tx.1;
let recording_state_tx = state_tx.0;
tokio::spawn(async move {
while let Some(msg) = ws_read.next().await {
match msg {
Ok(Message::Text(text)) => {
let json_str = if let Ok(decoded_bytes) =
base64::engine::general_purpose::STANDARD.decode(text.as_bytes())
{
String::from_utf8_lossy(&decoded_bytes).to_string()
} else {
text.to_string()
};
match serde_json::from_str::<Value>(&json_str) {
Ok(event) => {
let action = event
.get("action")
.and_then(|a| a.as_str())
.unwrap_or("")
.to_string();
match action.as_str() {
"recordingUpdate" => {
let status = event
.get("message")
.and_then(|m| m.get("status"))
.and_then(|s| s.as_str())
.unwrap_or("");
let rec_id = event
.get("message")
.and_then(|m| m.get("recordingId"))
.and_then(|r| r.as_str())
.unwrap_or("")
.to_string();
info!("[CLOUD] Recording update: status={status} id={rec_id}");
let _ = recording_state_tx
.send((status.to_string(), rec_id))
.await;
}
"liveStreamInsights" => {
let seq = event.get("sequence")
.and_then(|s| s.as_u64());
let _ = decoded_tx
.send(CloudDecodedEeg {
action: action.clone(),
data: event,
sequence: seq,
})
.await;
}
"realtimePredictionsResponse" => {
let seq = event.get("sequence")
.and_then(|s| s.as_u64());
let _ = decoded_tx
.send(CloudDecodedEeg {
action: action.clone(),
data: event,
sequence: seq,
})
.await;
}
"clientError" => {
let msg = event
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown error");
error!("[CLOUD] Client error: {msg}");
}
other => {
debug!("[CLOUD] Unhandled action: {other}");
}
}
}
Err(e) => {
debug!("[CLOUD] JSON parse error: {e} | raw: {json_str}");
}
}
}
Ok(Message::Binary(bin)) => {
if let Ok(decoded) =
base64::engine::general_purpose::STANDARD.decode(&bin)
{
let json_str = String::from_utf8_lossy(&decoded);
debug!("[CLOUD] Binary message: {json_str}");
}
}
Ok(Message::Close(_)) => {
info!("[CLOUD] WebSocket closed by server");
break;
}
Err(e) => {
error!("[CLOUD] WebSocket read error: {e}");
break;
}
_ => {}
}
}
debug!("[CLOUD] Reader task ended");
});
let start_msg = json!({
"version": 1,
"platform": PLATFORM,
"action": "startNewRecording",
"deviceId": self.device_id,
"deviceTs": now_ms(),
});
self.send_raw_json(&start_msg).await?;
info!("[CLOUD] Sent startNewRecording");
let timeout = tokio::time::Duration::from_secs(15);
let mut recording_id = String::new();
let deadline = tokio::time::Instant::now() + timeout;
loop {
tokio::select! {
Some((status, rec_id)) = state_rx.recv() => {
match status.as_str() {
"NOT_STARTED" => {
recording_id = rec_id;
info!("[CLOUD] Recording ID assigned: {recording_id}");
self.state = SessionState::RecordingStarted {
recording_id: recording_id.clone(),
};
}
"ONGOING" => {
if recording_id.is_empty() {
recording_id = rec_id;
}
info!("[CLOUD] Recording is ONGOING");
self.state = SessionState::RecordingOngoing {
recording_id: recording_id.clone(),
};
break;
}
"COMPLETED" | "FAILED" => {
warn!("[CLOUD] Recording ended with status: {status}");
self.state = SessionState::Ended;
return Err(anyhow!("Recording ended unexpectedly: {status}"));
}
_ => {
debug!("[CLOUD] Unexpected recording status: {status}");
}
}
}
_ = tokio::time::sleep_until(deadline) => {
if !recording_id.is_empty() {
info!("[CLOUD] Proceeding with recording ID (no ONGOING received)");
self.state = SessionState::RecordingOngoing {
recording_id: recording_id.clone(),
};
break;
}
return Err(anyhow!("Timed out waiting for recording ID from cloud"));
}
}
}
self.subscribe_live_insights(&recording_id).await?;
tokio::spawn(async move {
while state_rx.recv().await.is_some() {}
});
Ok(())
}
async fn subscribe_live_insights(&mut self, recording_id: &str) -> Result<()> {
let msg = json!({
"version": 1,
"platform": PLATFORM,
"action": "subscribeLiveStreamInsights",
"deviceId": self.device_id,
"deviceTs": now_ms(),
"recordingId": recording_id,
"streamsTypes": ["RAW_EEG", "FILTERED_EEG"],
});
self.send_raw_json(&msg).await?;
self.subscribed = true;
info!("[CLOUD] Subscribed to live stream insights (RAW_EEG, FILTERED_EEG)");
Ok(())
}
pub async fn send_raw_packet(
&mut self,
raw_data: &[u8],
device_ts: f64,
sequence: u64,
) -> Result<()> {
let recording_id = match &self.state {
SessionState::RecordingOngoing { recording_id } => recording_id.clone(),
SessionState::RecordingStarted { recording_id } => recording_id.clone(),
_ => return Err(anyhow!("Cloud session not active (state: {:?})", self.state)),
};
let b64 = base64::engine::general_purpose::STANDARD.encode(raw_data);
let msg = json!({
"version": 1,
"platform": PLATFORM,
"action": "publishRawMeasurements",
"deviceId": self.device_id,
"deviceTs": device_ts as u64,
"event": b64,
"recordingId": recording_id,
"sequence": sequence,
});
self.send_raw_json(&msg).await?;
self.sequence = sequence + 1;
Ok(())
}
pub fn try_recv_decoded(&mut self) -> Result<Option<CloudDecodedEeg>> {
if let Some(ref mut rx) = self.decoded_rx {
match rx.try_recv() {
Ok(decoded) => Ok(Some(decoded)),
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
Err(mpsc::error::TryRecvError::Disconnected) => {
Err(anyhow!("Cloud decoder channel closed"))
}
}
} else {
Ok(None)
}
}
pub async fn recv_decoded(&mut self) -> Result<Option<CloudDecodedEeg>> {
if let Some(ref mut rx) = self.decoded_rx {
Ok(rx.recv().await)
} else {
Ok(None)
}
}
async fn send_raw_json(&self, msg: &Value) -> Result<()> {
if let Some(ref tx) = self.ws_tx {
let text = serde_json::to_string(msg)?;
tx.send(text)
.await
.map_err(|e| anyhow!("Failed to send to cloud: {e}"))?;
Ok(())
} else {
Err(anyhow!("WebSocket not connected"))
}
}
pub async fn disconnect(&mut self) -> Result<()> {
let recording_id = match &self.state {
SessionState::RecordingOngoing { recording_id }
| SessionState::RecordingStarted { recording_id } => recording_id.clone(),
_ => {
self.state = SessionState::Disconnected;
self.ws_tx = None;
self.decoded_rx = None;
return Ok(());
}
};
let msg = json!({
"version": 1,
"platform": PLATFORM,
"action": "endOngoingRecording",
"deviceId": self.device_id,
"deviceTs": now_ms(),
"recordingId": recording_id,
});
if let Err(e) = self.send_raw_json(&msg).await {
warn!("[CLOUD] Error sending endOngoingRecording: {e}");
} else {
info!("[CLOUD] Sent endOngoingRecording for {recording_id}");
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
self.state = SessionState::Ended;
self.ws_tx = None;
self.decoded_rx = None;
self.subscribed = false;
info!("[CLOUD] Disconnected from IDUN Cloud");
Ok(())
}
pub fn is_connected(&self) -> bool {
matches!(
self.state,
SessionState::RecordingOngoing { .. } | SessionState::RecordingStarted { .. }
)
}
pub fn recording_id(&self) -> Option<&str> {
match &self.state {
SessionState::RecordingOngoing { recording_id }
| SessionState::RecordingStarted { recording_id } => Some(recording_id),
_ => None,
}
}
}