Skip to main content

emotiv_cortex_v2/
client.rs

1//! # Cortex WebSocket JSON-RPC Client
2//!
3//! Low-level transport for communicating with the Emotiv Cortex API.
4//! Handles WebSocket connection, TLS (self-signed cert for localhost),
5//! JSON-RPC request/response correlation, and the authentication flow.
6//!
7//! ## Architecture
8//!
9//! The WebSocket connection is split into reader/writer halves using
10//! `tokio-tungstenite`'s `StreamExt::split()`. This allows concurrent
11//! API calls and data streaming on the same WebSocket:
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────┐
15//! │                 CortexClient                     │
16//! │                                                  │
17//! │  writer: Arc<Mutex<SplitSink>>  ◄── call()       │
18//! │                                  ◄── subscribe() │
19//! │                                                  │
20//! │  reader_loop (spawned task):                     │
21//! │    SplitStream ─┬─► RPC response → oneshot tx    │
22//! │                 ├─► eeg event    → eeg_tx        │
23//! │                 ├─► dev event    → dev_tx        │
24//! │                 ├─► mot event    → mot_tx        │
25//! │                 └─► pow event    → pow_tx        │
26//! └─────────────────────────────────────────────────┘
27//! ```
28//!
29//! ## TLS Note
30//!
31//! The Emotiv Cortex service runs at `wss://localhost:6868` with a
32//! self-signed TLS certificate. TLS backend selection is feature-driven:
33//! `rustls-tls` (default) or `native-tls` (opt-in).
34//!
35//! ## Method Contract Template
36//!
37//! Public methods in this module document:
38//! - Cortex method name
39//! - required state (connection/auth/session)
40//! - parameter semantics
41//! - return shape and parsing behavior
42//! - error propagation and retry/idempotency notes
43
44use std::collections::HashMap;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
48
49use futures_util::{SinkExt, StreamExt, stream::SplitSink, stream::SplitStream};
50#[cfg(all(feature = "native-tls", not(feature = "rustls-tls")))]
51use native_tls::TlsConnector as NativeTlsConnector;
52#[cfg(feature = "rustls-tls")]
53use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
54#[cfg(feature = "rustls-tls")]
55use rustls::{DigitallySignedStruct, Error as RustlsError, SignatureScheme};
56#[cfg(feature = "rustls-tls")]
57use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
58use tokio::net::TcpStream;
59use tokio::sync::mpsc::error::TrySendError;
60use tokio::sync::{Mutex, mpsc, oneshot};
61use tokio::task::JoinHandle;
62#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
63use tokio_tungstenite::connect_async_tls_with_config;
64#[cfg(not(any(feature = "native-tls", feature = "rustls-tls")))]
65use tokio_tungstenite::tungstenite::error::UrlError;
66use tokio_tungstenite::{
67    Connector, MaybeTlsStream, WebSocketStream,
68    tungstenite::{Message, http},
69};
70
71use crate::config::CortexConfig;
72use crate::error::{CortexError, CortexResult};
73use crate::protocol::auth::UserLoginInfo;
74use crate::protocol::constants::{Methods, Streams};
75use crate::protocol::headset::{
76    ConfigMappingListValue, ConfigMappingMode, ConfigMappingRequest, ConfigMappingResponse,
77    ConfigMappingValue, HeadsetClockSyncResult, HeadsetInfo, QueryHeadsetsOptions,
78};
79use crate::protocol::profiles::{CurrentProfileInfo, ProfileAction, ProfileInfo};
80use crate::protocol::records::{ExportFormat, MarkerInfo, RecordInfo, UpdateRecordRequest};
81use crate::protocol::rpc::{CortexRequest, CortexResponse};
82use crate::protocol::session::SessionInfo;
83use crate::protocol::subjects::{
84    DemographicAttribute, QuerySubjectsRequest, SubjectInfo, SubjectRequest,
85};
86use crate::protocol::training::{
87    DetectionInfo, DetectionType, FacialExpressionSignatureTypeRequest,
88    FacialExpressionThresholdRequest, MentalCommandTrainingThresholdRequest,
89    TrainedSignatureActions, TrainingStatus, TrainingTime,
90};
91
92/// Connection timeout for the initial WebSocket handshake.
93const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
94
95/// Channel buffer size for data stream events.
96const STREAM_CHANNEL_BUFFER: usize = 1024;
97
98type ConnectOutput = Result<
99    (
100        WebSocketStream<MaybeTlsStream<TcpStream>>,
101        tokio_tungstenite::tungstenite::handshake::client::Response,
102    ),
103    tokio_tungstenite::tungstenite::Error,
104>;
105
106#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
107fn connect_websocket(
108    uri: http::Uri,
109    connector: Option<Connector>,
110) -> impl std::future::Future<Output = ConnectOutput> {
111    connect_async_tls_with_config(
112        uri, None, // WebSocket config
113        true, // disable_nagle
114        connector,
115    )
116}
117
118#[cfg(not(any(feature = "native-tls", feature = "rustls-tls")))]
119async fn connect_websocket(_uri: http::Uri, _connector: Option<Connector>) -> ConnectOutput {
120    Err(tokio_tungstenite::tungstenite::Error::Url(
121        UrlError::TlsFeatureNotEnabled,
122    ))
123}
124
125#[cfg(all(feature = "native-tls", not(feature = "rustls-tls")))]
126fn build_tls_connector(config: &CortexConfig, url: &str) -> CortexResult<Option<Connector>> {
127    let tls_connector = NativeTlsConnector::builder()
128        .danger_accept_invalid_certs(config.should_accept_invalid_certs())
129        .build()
130        .map_err(|e| CortexError::ConnectionFailed {
131            url: url.to_string(),
132            reason: format!("TLS configuration failed: {}", e),
133        })?;
134
135    Ok(Some(Connector::NativeTls(tls_connector)))
136}
137
138#[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
139fn build_tls_connector(config: &CortexConfig, url: &str) -> CortexResult<Option<Connector>> {
140    let _: http::Uri =
141        url.parse()
142            .map_err(|e: http::uri::InvalidUri| CortexError::ConnectionFailed {
143                url: url.to_string(),
144                reason: format!("Invalid URL: {e}"),
145            })?;
146
147    if !config.should_accept_invalid_certs() {
148        return Ok(None);
149    }
150
151    let tls_config = rustls::ClientConfig::builder()
152        .dangerous()
153        .with_custom_certificate_verifier(Arc::new(InsecureCertVerifier))
154        .with_no_client_auth();
155
156    Ok(Some(Connector::Rustls(Arc::new(tls_config))))
157}
158
159#[cfg(any(
160    all(feature = "native-tls", feature = "rustls-tls"),
161    all(not(feature = "native-tls"), not(feature = "rustls-tls"))
162))]
163fn build_tls_connector(_config: &CortexConfig, url: &str) -> CortexResult<Option<Connector>> {
164    let _: http::Uri =
165        url.parse()
166            .map_err(|e: http::uri::InvalidUri| CortexError::ConnectionFailed {
167                url: url.to_string(),
168                reason: format!("Invalid URL: {e}"),
169            })?;
170    Ok(None)
171}
172
173#[cfg(feature = "rustls-tls")]
174#[derive(Debug)]
175struct InsecureCertVerifier;
176
177#[cfg(feature = "rustls-tls")]
178impl ServerCertVerifier for InsecureCertVerifier {
179    fn verify_server_cert(
180        &self,
181        _end_entity: &CertificateDer<'_>,
182        _intermediates: &[CertificateDer<'_>],
183        _server_name: &ServerName<'_>,
184        _ocsp_response: &[u8],
185        _now: UnixTime,
186    ) -> Result<ServerCertVerified, RustlsError> {
187        Ok(ServerCertVerified::assertion())
188    }
189
190    fn verify_tls12_signature(
191        &self,
192        _message: &[u8],
193        _cert: &CertificateDer<'_>,
194        _dss: &DigitallySignedStruct,
195    ) -> Result<HandshakeSignatureValid, RustlsError> {
196        Ok(HandshakeSignatureValid::assertion())
197    }
198
199    fn verify_tls13_signature(
200        &self,
201        _message: &[u8],
202        _cert: &CertificateDer<'_>,
203        _dss: &DigitallySignedStruct,
204    ) -> Result<HandshakeSignatureValid, RustlsError> {
205        Ok(HandshakeSignatureValid::assertion())
206    }
207
208    fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
209        vec![
210            SignatureScheme::ECDSA_NISTP384_SHA384,
211            SignatureScheme::ECDSA_NISTP256_SHA256,
212            SignatureScheme::RSA_PSS_SHA512,
213            SignatureScheme::RSA_PSS_SHA384,
214            SignatureScheme::RSA_PSS_SHA256,
215            SignatureScheme::ED25519,
216            SignatureScheme::RSA_PKCS1_SHA512,
217            SignatureScheme::RSA_PKCS1_SHA384,
218            SignatureScheme::RSA_PKCS1_SHA256,
219        ]
220    }
221}
222
223/// Type alias for the write half of the WebSocket connection.
224type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
225
226/// Type alias for the read half of the WebSocket connection.
227type WsReader = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
228
229/// A pending RPC response awaiting its matching JSON-RPC response by `id`.
230type PendingResponse = oneshot::Sender<CortexResult<serde_json::Value>>;
231
232/// Senders for dispatching stream data events to consumers.
233pub type StreamSenders = HashMap<&'static str, mpsc::Sender<serde_json::Value>>;
234
235/// Receivers for consuming stream data events.
236pub type StreamReceivers = HashMap<&'static str, mpsc::Receiver<serde_json::Value>>;
237
238/// Snapshot of stream dispatch behavior for one stream key.
239#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
240pub struct StreamDispatchStats {
241    /// Number of events successfully queued to the stream channel.
242    pub delivered: u64,
243    /// Number of events dropped because the channel was full.
244    pub dropped_full: u64,
245    /// Number of events dropped because the channel was closed.
246    pub dropped_closed: u64,
247}
248
249#[derive(Debug, Default)]
250struct StreamDispatchCounters {
251    delivered: AtomicU64,
252    dropped_full: AtomicU64,
253    dropped_closed: AtomicU64,
254}
255
256impl StreamDispatchCounters {
257    fn snapshot(&self) -> StreamDispatchStats {
258        StreamDispatchStats {
259            delivered: self.delivered.load(Ordering::Relaxed),
260            dropped_full: self.dropped_full.load(Ordering::Relaxed),
261            dropped_closed: self.dropped_closed.load(Ordering::Relaxed),
262        }
263    }
264}
265
266type StreamDispatchCounterMap = HashMap<&'static str, Arc<StreamDispatchCounters>>;
267
268/// WebSocket JSON-RPC client for the Emotiv Cortex API.
269///
270/// This client manages a single WebSocket connection, split into reader
271/// and writer halves. The writer is shared (behind `Arc<Mutex>`) so that
272/// API calls can be made concurrently with data streaming. The reader
273/// runs in a background task that dispatches:
274///
275/// - **RPC responses** → matched by `id` to pending `oneshot` channels
276/// - **Data events** → routed by stream type to `mpsc` channels
277pub struct CortexClient {
278    /// Shared write half of the WebSocket.
279    writer: Arc<Mutex<WsWriter>>,
280
281    /// Map of pending RPC requests awaiting responses, keyed by request ID.
282    pending_responses: Arc<Mutex<HashMap<u64, PendingResponse>>>,
283
284    /// Auto-incrementing request ID counter.
285    next_id: AtomicU64,
286
287    /// Handle to the background reader loop task.
288    reader_handle: Option<JoinHandle<()>>,
289
290    /// Whether the reader loop is currently running.
291    reader_running: Arc<AtomicBool>,
292
293    /// Shutdown signal for the reader loop.
294    reader_shutdown: tokio::sync::watch::Sender<bool>,
295
296    /// Shared stream senders, dynamically updatable without restarting
297    /// the reader loop. The reader holds a clone of this Arc and checks
298    /// it on each data message.
299    stream_senders: Arc<std::sync::Mutex<Option<StreamSenders>>>,
300
301    /// Per-stream dispatch counters for backpressure/drop observability.
302    stream_dispatch_counters: Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
303
304    /// RPC call timeout (from config).
305    rpc_timeout: Duration,
306
307    /// Monotonic clock origin used for `syncWithHeadsetClock`.
308    clock_origin: Instant,
309}
310
311impl CortexClient {
312    /// Connect to the Cortex API WebSocket service.
313    ///
314    /// The Cortex service must be running on the local machine.
315    /// TLS is configured based on the [`CortexConfig`] settings.
316    ///
317    /// # Examples
318    ///
319    /// ```no_run
320    /// use emotiv_cortex_v2::{CortexClient, CortexConfig};
321    ///
322    /// # async fn demo() -> emotiv_cortex_v2::CortexResult<()> {
323    /// let config = CortexConfig::discover(None)?;
324    /// let mut client = CortexClient::connect(&config).await?;
325    ///
326    /// let info = client.get_cortex_info().await?;
327    /// client.disconnect().await?;
328    /// # Ok(())
329    /// # }
330    /// ```
331    ///
332    /// # Errors
333    /// Returns any error produced by the underlying Cortex API call,
334    /// including connection, authentication, protocol, timeout, and configuration errors.
335    pub async fn connect(config: &CortexConfig) -> CortexResult<Self> {
336        let url = &config.cortex_url;
337        let rpc_timeout = Duration::from_secs(config.timeouts.rpc_timeout_secs);
338        let connector = build_tls_connector(config, url)?;
339
340        // Parse the WebSocket URL as a URI for the connection.
341        let uri: http::Uri =
342            url.parse()
343                .map_err(|e: http::uri::InvalidUri| CortexError::ConnectionFailed {
344                    url: url.clone(),
345                    reason: format!("Invalid URL: {e}"),
346                })?;
347
348        let connect_fut = connect_websocket(uri, connector);
349
350        let (ws, response) = tokio::time::timeout(CONNECT_TIMEOUT, connect_fut)
351            .await
352            .map_err(|_| CortexError::Timeout { seconds: 5 })?
353            .map_err(|e| CortexError::ConnectionFailed {
354                url: url.clone(),
355                reason: format!("WebSocket connection failed: {e}"),
356            })?;
357
358        tracing::info!(url, status = %response.status(), "Connected to Cortex API");
359
360        // Split the WebSocket into reader and writer halves.
361        let (writer, reader) = ws.split();
362
363        let pending_responses: Arc<Mutex<HashMap<u64, PendingResponse>>> =
364            Arc::new(Mutex::new(HashMap::new()));
365
366        let reader_running = Arc::new(AtomicBool::new(true));
367        let (reader_shutdown, reader_shutdown_rx) = tokio::sync::watch::channel(false);
368        let stream_senders: Arc<std::sync::Mutex<Option<StreamSenders>>> =
369            Arc::new(std::sync::Mutex::new(None));
370        let stream_dispatch_counters: Arc<std::sync::Mutex<StreamDispatchCounterMap>> =
371            Arc::new(std::sync::Mutex::new(HashMap::new()));
372
373        // Start the reader loop immediately — it needs to be running before
374        // any API calls so that responses can be dispatched.
375        let reader_handle = Self::spawn_reader_loop(
376            reader,
377            Arc::clone(&pending_responses),
378            Arc::clone(&reader_running),
379            Arc::clone(&stream_senders),
380            Arc::clone(&stream_dispatch_counters),
381            reader_shutdown_rx,
382        );
383
384        Ok(Self {
385            writer: Arc::new(Mutex::new(writer)),
386            pending_responses,
387            next_id: AtomicU64::new(1),
388            reader_handle: Some(reader_handle),
389            reader_running,
390            reader_shutdown,
391            stream_senders,
392            stream_dispatch_counters,
393            rpc_timeout,
394            clock_origin: Instant::now(),
395        })
396    }
397
398    /// Connect to the Cortex API using just a URL (convenience for simple use cases).
399    ///
400    /// Uses default timeouts and localhost TLS settings.
401    ///
402    /// # Errors
403    /// Returns any error produced by the underlying Cortex API call,
404    /// including connection, authentication, protocol, timeout, and configuration errors.
405    pub async fn connect_url(url: &str) -> CortexResult<Self> {
406        let config = CortexConfig {
407            client_id: String::new(),
408            client_secret: String::new(),
409            cortex_url: url.to_string(),
410            ..CortexConfig::new("", "")
411        };
412        Self::connect(&config).await
413    }
414
415    /// Spawn the background reader loop that dispatches WebSocket messages.
416    fn spawn_reader_loop(
417        mut reader: WsReader,
418        pending_responses: Arc<Mutex<HashMap<u64, PendingResponse>>>,
419        running: Arc<AtomicBool>,
420        stream_senders: Arc<std::sync::Mutex<Option<StreamSenders>>>,
421        stream_dispatch_counters: Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
422        mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
423    ) -> JoinHandle<()> {
424        tokio::spawn(async move {
425            while running.load(Ordering::SeqCst) {
426                let msg = tokio::select! {
427                    msg = reader.next() => msg,
428                    changed = shutdown_rx.changed() => {
429                        match changed {
430                            Ok(()) if *shutdown_rx.borrow() => break,
431                            Ok(()) => continue,
432                            Err(_) => break,
433                        }
434                    },
435                };
436
437                match msg {
438                    Some(Ok(Message::Text(text))) => {
439                        Self::handle_text_message(
440                            &text,
441                            &pending_responses,
442                            &stream_senders,
443                            &stream_dispatch_counters,
444                        )
445                        .await;
446                    }
447                    Some(Ok(Message::Close(_))) => {
448                        tracing::info!("Cortex WebSocket closed by server");
449                        Self::drain_pending_connection_lost(
450                            &pending_responses,
451                            "Cortex WebSocket closed",
452                        )
453                        .await;
454                        break;
455                    }
456                    Some(Err(e)) => {
457                        tracing::warn!("WebSocket read error: {}", e);
458                        Self::drain_pending_websocket(
459                            &pending_responses,
460                            format!("WebSocket error: {e}"),
461                        )
462                        .await;
463                        break;
464                    }
465                    None => {
466                        tracing::info!("Cortex WebSocket stream ended");
467                        break;
468                    }
469                    _ => {
470                        // Binary messages, pings, pongs — skip
471                    }
472                }
473            }
474
475            Self::drain_pending_connection_lost(&pending_responses, "Reader loop stopped").await;
476
477            tracing::debug!("Reader loop exiting");
478            running.store(false, Ordering::SeqCst);
479        })
480    }
481
482    async fn handle_text_message(
483        text: &str,
484        pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
485        stream_senders: &Arc<std::sync::Mutex<Option<StreamSenders>>>,
486        stream_dispatch_counters: &Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
487    ) {
488        tracing::debug!(raw = %text, "Reader loop received message");
489
490        let value: serde_json::Value = match serde_json::from_str(text) {
491            Ok(v) => v,
492            Err(e) => {
493                tracing::warn!("Failed to parse WebSocket message as JSON: {}", e);
494                return;
495            }
496        };
497
498        if value
499            .get("id")
500            .and_then(serde_json::Value::as_u64)
501            .is_some()
502        {
503            let _ = Self::dispatch_rpc_response(value, pending_responses).await;
504            return;
505        }
506
507        Self::dispatch_stream_event(value, stream_senders, stream_dispatch_counters);
508    }
509
510    async fn dispatch_rpc_response(
511        value: serde_json::Value,
512        pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
513    ) -> bool {
514        let Some(id) = value.get("id").and_then(serde_json::Value::as_u64) else {
515            return false;
516        };
517
518        let response: std::result::Result<CortexResponse, _> = serde_json::from_value(value);
519
520        let mut pending = pending_responses.lock().await;
521        if let Some(tx) = pending.remove(&id) {
522            match response {
523                Ok(resp) => {
524                    let result = if let Some(error) = resp.error {
525                        tracing::error!(
526                            id,
527                            code = error.code,
528                            message = %error.message,
529                            "Cortex API error in RPC response",
530                        );
531                        Err(CortexError::from_api_error(error.code, error.message))
532                    } else {
533                        resp.result.ok_or_else(|| CortexError::ProtocolError {
534                            reason: "Response has no result or error".into(),
535                        })
536                    };
537                    let _ = tx.send(result);
538                }
539                Err(e) => {
540                    let _ = tx.send(Err(CortexError::ProtocolError {
541                        reason: format!("Failed to parse RPC response: {e}"),
542                    }));
543                }
544            }
545        } else {
546            tracing::debug!(id, "Received response for unknown request ID");
547        }
548
549        true
550    }
551
552    fn dispatch_stream_event(
553        value: serde_json::Value,
554        stream_senders: &Arc<std::sync::Mutex<Option<StreamSenders>>>,
555        stream_dispatch_counters: &Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
556    ) {
557        let target_sender = if let Ok(guard) = stream_senders.lock() {
558            guard.as_ref().and_then(|senders| {
559                senders
560                    .iter()
561                    .find_map(|(key, tx)| value.get(*key).is_some().then(|| (*key, tx.clone())))
562            })
563        } else {
564            None
565        };
566
567        if let Some((stream_key, tx)) = target_sender {
568            let counter = stream_dispatch_counters
569                .lock()
570                .ok()
571                .and_then(|counters| counters.get(stream_key).cloned());
572
573            match tx.try_send(value) {
574                Ok(()) => {
575                    if let Some(counter) = counter {
576                        counter.delivered.fetch_add(1, Ordering::Relaxed);
577                    }
578                }
579                Err(TrySendError::Full(_)) => {
580                    if let Some(counter) = counter {
581                        counter.dropped_full.fetch_add(1, Ordering::Relaxed);
582                    }
583                }
584                Err(TrySendError::Closed(_)) => {
585                    if let Some(counter) = counter {
586                        counter.dropped_closed.fetch_add(1, Ordering::Relaxed);
587                    }
588                }
589            }
590        }
591    }
592
593    async fn drain_pending_connection_lost(
594        pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
595        reason: &str,
596    ) {
597        let mut pending = pending_responses.lock().await;
598        for (_, tx) in pending.drain() {
599            let _ = tx.send(Err(CortexError::ConnectionLost {
600                reason: reason.to_string(),
601            }));
602        }
603    }
604
605    async fn drain_pending_websocket(
606        pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
607        reason: String,
608    ) {
609        let mut pending = pending_responses.lock().await;
610        for (_, tx) in pending.drain() {
611            let _ = tx.send(Err(CortexError::WebSocket(reason.clone())));
612        }
613    }
614
615    // ─── Core RPC ───────────────────────────────────────────────────────
616
617    /// Send a JSON-RPC request and wait for the matching response.
618    async fn call(
619        &self,
620        method: &'static str,
621        params: serde_json::Value,
622    ) -> CortexResult<serde_json::Value> {
623        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
624        let request = CortexRequest::new(id, method, params);
625
626        let json = serde_json::to_string(&request).map_err(|e| CortexError::ProtocolError {
627            reason: format!("serialize error: {e}"),
628        })?;
629
630        tracing::debug!(method, id, json = %json, "Sending Cortex request");
631
632        // Register the pending response before sending
633        let (tx, rx) = oneshot::channel();
634        {
635            let mut pending = self.pending_responses.lock().await;
636            pending.insert(id, tx);
637        }
638
639        // Send the request via the shared writer
640        let send_result = {
641            let mut writer = self.writer.lock().await;
642            writer.send(Message::Text(json.into())).await
643        };
644        if let Err(e) = send_result {
645            let mut pending = self.pending_responses.lock().await;
646            pending.remove(&id);
647            return Err(CortexError::WebSocket(format!("Send error: {e}")));
648        }
649
650        // Wait for the reader loop to deliver the response
651        let timeout_secs = self.rpc_timeout.as_secs();
652        let result = match tokio::time::timeout(self.rpc_timeout, rx).await {
653            Ok(Ok(response)) => response,
654            Ok(Err(_)) => {
655                return Err(CortexError::ConnectionLost {
656                    reason: "Response channel dropped (reader loop died)".into(),
657                });
658            }
659            Err(_) => {
660                self.pending_responses.lock().await.remove(&id);
661                return Err(CortexError::Timeout {
662                    seconds: timeout_secs,
663                });
664            }
665        }?;
666
667        tracing::debug!(method, id, "Cortex RPC succeeded");
668        Ok(result)
669    }
670
671    fn query_headsets_params(options: QueryHeadsetsOptions) -> serde_json::Value {
672        let mut params = serde_json::json!({});
673        if let Some(id) = options.id {
674            params["id"] = serde_json::json!(id);
675        }
676        if options.include_flex_mappings {
677            params["includeFlexMappings"] = serde_json::json!(true);
678        }
679        params
680    }
681
682    fn sync_with_headset_clock_params(&self, headset_id: &str) -> CortexResult<serde_json::Value> {
683        let monotonic_time = self.clock_origin.elapsed().as_secs_f64();
684        let system_time = Self::current_epoch_millis()?;
685
686        Ok(Self::sync_with_headset_clock_params_with_times(
687            headset_id,
688            monotonic_time,
689            system_time,
690        ))
691    }
692
693    fn current_epoch_millis() -> CortexResult<u64> {
694        let system_duration = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|e| {
695            CortexError::ProtocolError {
696                reason: format!("System clock is before UNIX epoch: {e}"),
697            }
698        })?;
699
700        u64::try_from(system_duration.as_millis()).map_err(|_| CortexError::ProtocolError {
701            reason: "System time in milliseconds exceeds u64 range".into(),
702        })
703    }
704
705    fn sync_with_headset_clock_params_with_times(
706        headset_id: &str,
707        monotonic_time: f64,
708        system_time: u64,
709    ) -> serde_json::Value {
710        serde_json::json!({
711            "headset": headset_id,
712            "monotonicTime": monotonic_time,
713            "systemTime": system_time,
714        })
715    }
716
717    fn config_mapping_params(
718        cortex_token: &str,
719        request: ConfigMappingRequest,
720    ) -> CortexResult<(ConfigMappingMode, serde_json::Value)> {
721        let mut params = serde_json::json!({
722            "cortexToken": cortex_token,
723            "status": request.mode().as_str(),
724        });
725
726        match request {
727            ConfigMappingRequest::Create { name, mappings } => {
728                if name.trim().is_empty() {
729                    return Err(CortexError::ProtocolError {
730                        reason: "configMapping create requires non-empty name".into(),
731                    });
732                }
733                if !mappings.is_object() {
734                    return Err(CortexError::ProtocolError {
735                        reason: "configMapping create requires mappings as an object".into(),
736                    });
737                }
738                params["name"] = serde_json::json!(name);
739                params["mappings"] = mappings;
740                Ok((ConfigMappingMode::Create, params))
741            }
742            ConfigMappingRequest::Get => Ok((ConfigMappingMode::Get, params)),
743            ConfigMappingRequest::Read { uuid } => {
744                if uuid.trim().is_empty() {
745                    return Err(CortexError::ProtocolError {
746                        reason: "configMapping read requires non-empty uuid".into(),
747                    });
748                }
749                params["uuid"] = serde_json::json!(uuid);
750                Ok((ConfigMappingMode::Read, params))
751            }
752            ConfigMappingRequest::Update {
753                uuid,
754                name,
755                mappings,
756            } => {
757                if uuid.trim().is_empty() {
758                    return Err(CortexError::ProtocolError {
759                        reason: "configMapping update requires non-empty uuid".into(),
760                    });
761                }
762                if name.is_none() && mappings.is_none() {
763                    return Err(CortexError::ProtocolError {
764                        reason: "configMapping update requires name and/or mappings".into(),
765                    });
766                }
767                if mappings.as_ref().is_some_and(|m| !m.is_object()) {
768                    return Err(CortexError::ProtocolError {
769                        reason: "configMapping update requires mappings as an object".into(),
770                    });
771                }
772
773                params["uuid"] = serde_json::json!(uuid);
774                if let Some(value) = name {
775                    if value.trim().is_empty() {
776                        return Err(CortexError::ProtocolError {
777                            reason: "configMapping update name must be non-empty".into(),
778                        });
779                    }
780                    params["name"] = serde_json::json!(value);
781                }
782                if let Some(value) = mappings {
783                    params["mappings"] = value;
784                }
785                Ok((ConfigMappingMode::Update, params))
786            }
787            ConfigMappingRequest::Delete { uuid } => {
788                if uuid.trim().is_empty() {
789                    return Err(CortexError::ProtocolError {
790                        reason: "configMapping delete requires non-empty uuid".into(),
791                    });
792                }
793                params["uuid"] = serde_json::json!(uuid);
794                Ok((ConfigMappingMode::Delete, params))
795            }
796        }
797    }
798
799    fn update_headset_custom_info_params(
800        cortex_token: &str,
801        headset_id: &str,
802        headband_position: Option<&str>,
803        custom_name: Option<&str>,
804    ) -> serde_json::Value {
805        let mut params = serde_json::json!({
806            "cortexToken": cortex_token,
807            "headsetId": headset_id,
808            // Backward-compat for older deployments.
809            "headset": headset_id,
810        });
811
812        if let Some(pos) = headband_position {
813            params["headbandPosition"] = serde_json::json!(pos);
814        }
815        if let Some(name) = custom_name {
816            params["customName"] = serde_json::json!(name);
817        }
818        params
819    }
820
821    fn mental_command_training_threshold_params(
822        cortex_token: &str,
823        session_id: Option<&str>,
824        profile: Option<&str>,
825        status: Option<&str>,
826        value: Option<f64>,
827    ) -> CortexResult<serde_json::Value> {
828        match (session_id, profile) {
829            (Some(_), Some(_)) => {
830                return Err(CortexError::ProtocolError {
831                    reason: "Specify either session_id or profile, not both".into(),
832                });
833            }
834            (None, None) => {
835                return Err(CortexError::ProtocolError {
836                    reason: "Specify either session_id or profile".into(),
837                });
838            }
839            _ => {}
840        }
841
842        let inferred_status = status.unwrap_or(if value.is_some() { "set" } else { "get" });
843
844        let mut params = serde_json::json!({
845            "cortexToken": cortex_token,
846            "status": inferred_status,
847        });
848
849        if let Some(session) = session_id {
850            params["session"] = serde_json::json!(session);
851        }
852        if let Some(profile_name) = profile {
853            params["profile"] = serde_json::json!(profile_name);
854        }
855        if let Some(threshold) = value {
856            params["value"] = serde_json::json!(threshold);
857        }
858        Ok(params)
859    }
860
861    fn subject_params(cortex_token: &str, request: &SubjectRequest) -> serde_json::Value {
862        let mut params = serde_json::json!({
863            "cortexToken": cortex_token,
864            "subjectName": request.subject_name.as_str(),
865        });
866
867        if let Some(dob) = &request.date_of_birth {
868            params["dateOfBirth"] = serde_json::json!(dob);
869        }
870        if let Some(sex) = &request.sex {
871            params["sex"] = serde_json::json!(sex);
872        }
873        if let Some(country_code) = &request.country_code {
874            params["countryCode"] = serde_json::json!(country_code);
875        }
876        if let Some(state) = &request.state {
877            params["state"] = serde_json::json!(state);
878        }
879        if let Some(city) = &request.city {
880            params["city"] = serde_json::json!(city);
881        }
882        if let Some(attributes) = &request.attributes {
883            params["attributes"] = serde_json::json!(attributes);
884        }
885
886        params
887    }
888
889    fn query_subjects_params(
890        cortex_token: &str,
891        request: &QuerySubjectsRequest,
892    ) -> serde_json::Value {
893        let mut params = serde_json::json!({
894            "cortexToken": cortex_token,
895            "query": request.query.clone(),
896            "orderBy": request.order_by.clone(),
897        });
898
899        if let Some(limit) = request.limit {
900            params["limit"] = serde_json::json!(limit);
901        }
902        if let Some(offset) = request.offset {
903            params["offset"] = serde_json::json!(offset);
904        }
905
906        params
907    }
908
909    fn facial_expression_signature_type_params(
910        cortex_token: &str,
911        request: &FacialExpressionSignatureTypeRequest,
912    ) -> serde_json::Value {
913        let mut params = serde_json::json!({
914            "cortexToken": cortex_token,
915            "status": request.status.as_str(),
916        });
917
918        if let Some(profile) = &request.profile {
919            params["profile"] = serde_json::json!(profile);
920        }
921        if let Some(session) = &request.session {
922            params["session"] = serde_json::json!(session);
923        }
924        if let Some(signature) = &request.signature {
925            params["signature"] = serde_json::json!(signature);
926        }
927
928        params
929    }
930
931    fn facial_expression_threshold_params(
932        cortex_token: &str,
933        request: &FacialExpressionThresholdRequest,
934    ) -> serde_json::Value {
935        let mut params = serde_json::json!({
936            "cortexToken": cortex_token,
937            "status": request.status.as_str(),
938            "action": request.action.as_str(),
939        });
940
941        if let Some(profile) = &request.profile {
942            params["profile"] = serde_json::json!(profile);
943        }
944        if let Some(session) = &request.session {
945            params["session"] = serde_json::json!(session);
946        }
947        if let Some(value) = request.value {
948            params["value"] = serde_json::json!(value);
949        }
950
951        params
952    }
953
954    // ─── Streaming ──────────────────────────────────────────────────────
955
956    /// Stream name validation and mapping to static keys.
957    fn stream_key(name: &str) -> &'static str {
958        match name {
959            Streams::EEG => "eeg",
960            Streams::DEV => "dev",
961            Streams::MOT => "mot",
962            Streams::EQ => "eq",
963            Streams::POW => "pow",
964            Streams::MET => "met",
965            Streams::COM => "com",
966            Streams::FAC => "fac",
967            Streams::SYS => "sys",
968            other => {
969                tracing::warn!(stream = other, "Unknown stream type");
970                "unknown"
971            }
972        }
973    }
974
975    /// Create data stream channels for the specified streams.
976    ///
977    /// This replaces ALL existing stream channels. Call before
978    /// [`subscribe_streams`](Self::subscribe_streams).
979    pub fn create_stream_channels(&self, streams: &[&str]) -> StreamReceivers {
980        let mut senders = StreamSenders::new();
981        let mut receivers = StreamReceivers::new();
982        let mut counters = StreamDispatchCounterMap::new();
983
984        for &stream in streams {
985            let stream_key = Self::stream_key(stream);
986            let (tx, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
987            senders.insert(stream_key, tx);
988            receivers.insert(stream_key, rx);
989            counters.insert(stream_key, Arc::new(StreamDispatchCounters::default()));
990        }
991
992        if let Ok(mut guard) = self.stream_senders.lock() {
993            *guard = Some(senders);
994        }
995        if let Ok(mut guard) = self.stream_dispatch_counters.lock() {
996            *guard = counters;
997        }
998
999        receivers
1000    }
1001
1002    /// Add a single stream channel without disturbing existing ones.
1003    ///
1004    /// Returns a receiver for the new channel.
1005    pub fn add_stream_channel(&self, stream: &str) -> Option<mpsc::Receiver<serde_json::Value>> {
1006        let stream_key = Self::stream_key(stream);
1007        let (tx, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
1008        if let Ok(mut guard) = self.stream_senders.lock() {
1009            let senders = guard.get_or_insert_with(StreamSenders::new);
1010            senders.insert(stream_key, tx);
1011            if let Ok(mut counters) = self.stream_dispatch_counters.lock() {
1012                counters
1013                    .entry(stream_key)
1014                    .or_insert_with(|| Arc::new(StreamDispatchCounters::default()));
1015            }
1016            Some(rx)
1017        } else {
1018            None
1019        }
1020    }
1021
1022    /// Remove a single stream channel sender.
1023    pub fn remove_stream_channel(&self, stream: &str) {
1024        if let Ok(mut guard) = self.stream_senders.lock() {
1025            if let Some(ref mut senders) = *guard {
1026                let stream_key = Self::stream_key(stream);
1027                senders.remove(stream_key);
1028                if let Ok(mut counters) = self.stream_dispatch_counters.lock() {
1029                    counters.remove(stream_key);
1030                }
1031            }
1032        }
1033    }
1034
1035    /// Clear all stream senders.
1036    pub fn clear_stream_channels(&self) {
1037        if let Ok(mut guard) = self.stream_senders.lock() {
1038            *guard = None;
1039        }
1040        if let Ok(mut counters) = self.stream_dispatch_counters.lock() {
1041            counters.clear();
1042        }
1043    }
1044
1045    /// Returns the current stream dispatch stats keyed by stream type (`"eeg"`, `"mot"`, ...).
1046    pub fn stream_dispatch_stats(&self) -> HashMap<&'static str, StreamDispatchStats> {
1047        if let Ok(counters) = self.stream_dispatch_counters.lock() {
1048            counters
1049                .iter()
1050                .map(|(stream, counter)| (*stream, counter.snapshot()))
1051                .collect()
1052        } else {
1053            HashMap::new()
1054        }
1055    }
1056
1057    /// Returns the number of currently pending RPC responses.
1058    pub async fn pending_response_count(&self) -> usize {
1059        self.pending_responses.lock().await.len()
1060    }
1061
1062    // ─── Authentication ─────────────────────────────────────────────────
1063
1064    /// Query Cortex service version and build info.
1065    ///
1066    /// No authentication required. Useful as a health check.
1067    ///
1068    /// # Errors
1069    /// Returns any error produced by the underlying Cortex API call,
1070    /// including connection, authentication, protocol, timeout, and configuration errors.
1071    pub async fn get_cortex_info(&self) -> CortexResult<serde_json::Value> {
1072        self.call(Methods::GET_CORTEX_INFO, serde_json::json!({}))
1073            .await
1074    }
1075
1076    /// Check if the application has been granted access rights.
1077    ///
1078    /// # Errors
1079    /// Returns any error produced by the underlying Cortex API call,
1080    /// including connection, authentication, protocol, timeout, and configuration errors.
1081    pub async fn has_access_right(
1082        &self,
1083        client_id: &str,
1084        client_secret: &str,
1085    ) -> CortexResult<bool> {
1086        let result = self
1087            .call(
1088                Methods::HAS_ACCESS_RIGHT,
1089                serde_json::json!({
1090                    "clientId": client_id,
1091                    "clientSecret": client_secret,
1092                }),
1093            )
1094            .await?;
1095
1096        Ok(result
1097            .get("accessGranted")
1098            .and_then(serde_json::Value::as_bool)
1099            .unwrap_or(false))
1100    }
1101
1102    /// Get the currently logged-in Emotiv user.
1103    ///
1104    /// # Errors
1105    /// Returns any error produced by the underlying Cortex API call,
1106    /// including connection, authentication, protocol, timeout, and configuration errors.
1107    pub async fn get_user_login(&self) -> CortexResult<Vec<UserLoginInfo>> {
1108        let result = self
1109            .call(Methods::GET_USER_LOGIN, serde_json::json!({}))
1110            .await?;
1111
1112        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1113            reason: format!("Failed to parse user login info: {e}"),
1114        })
1115    }
1116
1117    /// Authenticate with the Cortex API.
1118    ///
1119    /// Performs: `getCortexInfo` → `requestAccess` → `authorize`.
1120    ///
1121    /// Returns the cortex token needed for all subsequent operations.
1122    ///
1123    /// # Errors
1124    /// Returns any error produced by the underlying Cortex API call,
1125    /// including connection, authentication, protocol, timeout, and configuration errors.
1126    pub async fn authenticate(&self, client_id: &str, client_secret: &str) -> CortexResult<String> {
1127        // Step 0: getCortexInfo — verify API is alive
1128        let cortex_info_ok = match self.get_cortex_info().await {
1129            Ok(info) => {
1130                tracing::info!("Cortex API info: {}", info);
1131                true
1132            }
1133            Err(e) => {
1134                tracing::warn!("getCortexInfo failed (continuing anyway): {}", e);
1135                false
1136            }
1137        };
1138
1139        // Step 1: requestAccess — gracefully skip if method doesn't exist
1140        match self
1141            .call(
1142                Methods::REQUEST_ACCESS,
1143                serde_json::json!({
1144                    "clientId": client_id,
1145                    "clientSecret": client_secret,
1146                }),
1147            )
1148            .await
1149        {
1150            Ok(_) => tracing::debug!("Cortex access requested"),
1151            Err(CortexError::MethodNotFound { .. }) => {
1152                tracing::info!(
1153                    "requestAccess not available on this Cortex version \
1154                     (Launcher handles app approval directly)"
1155                );
1156            }
1157            Err(e) => return Err(e),
1158        }
1159
1160        // Step 2: authorize and get a cortex token
1161        let auth_result = match self
1162            .call(
1163                Methods::AUTHORIZE,
1164                serde_json::json!({
1165                    "clientId": client_id,
1166                    "clientSecret": client_secret,
1167                }),
1168            )
1169            .await
1170        {
1171            Ok(result) => result,
1172            Err(CortexError::MethodNotFound { .. }) => {
1173                if !cortex_info_ok {
1174                    tracing::error!(
1175                        "Both getCortexInfo and authorize returned 'Method not found'. \
1176                         The service may not be the Emotiv Cortex API, or may be incompatible."
1177                    );
1178                }
1179                return Err(CortexError::AuthenticationFailed {
1180                    reason: "Cortex API 'authorize' method not found (-32601). \
1181                             Check that the EMOTIV Launcher is running and you are logged in."
1182                        .into(),
1183                });
1184            }
1185            Err(e) => return Err(e),
1186        };
1187
1188        let cortex_token = auth_result
1189            .get("cortexToken")
1190            .and_then(|v| v.as_str())
1191            .ok_or_else(|| CortexError::ProtocolError {
1192                reason: "authorize response missing cortexToken".into(),
1193            })?
1194            .to_string();
1195
1196        tracing::info!("Cortex authentication successful");
1197
1198        Ok(cortex_token)
1199    }
1200
1201    /// Generate a new cortex token (or refresh an existing one).
1202    ///
1203    /// Can be used to obtain a fresh token without the full `requestAccess` → `authorize` flow.
1204    ///
1205    /// # Errors
1206    /// Returns any error produced by the underlying Cortex API call,
1207    /// including connection, authentication, protocol, timeout, and configuration errors.
1208    pub async fn generate_new_token(
1209        &self,
1210        cortex_token: &str,
1211        client_id: &str,
1212        client_secret: &str,
1213    ) -> CortexResult<String> {
1214        let result = self
1215            .call(
1216                Methods::GENERATE_NEW_TOKEN,
1217                serde_json::json!({
1218                    "cortexToken": cortex_token,
1219                    "clientId": client_id,
1220                    "clientSecret": client_secret,
1221                }),
1222            )
1223            .await?;
1224
1225        result
1226            .get("cortexToken")
1227            .and_then(|v| v.as_str())
1228            .map(std::string::ToString::to_string)
1229            .ok_or_else(|| CortexError::ProtocolError {
1230                reason: "generateNewToken response missing cortexToken".into(),
1231            })
1232    }
1233
1234    /// Get information about the current user.
1235    ///
1236    /// # Errors
1237    /// Returns any error produced by the underlying Cortex API call,
1238    /// including connection, authentication, protocol, timeout, and configuration errors.
1239    pub async fn get_user_info(&self, cortex_token: &str) -> CortexResult<serde_json::Value> {
1240        self.call(
1241            Methods::GET_USER_INFO,
1242            serde_json::json!({
1243                "cortexToken": cortex_token,
1244            }),
1245        )
1246        .await
1247    }
1248
1249    /// Get information about the license used by the application.
1250    ///
1251    /// # Errors
1252    /// Returns any error produced by the underlying Cortex API call,
1253    /// including connection, authentication, protocol, timeout, and configuration errors.
1254    pub async fn get_license_info(&self, cortex_token: &str) -> CortexResult<serde_json::Value> {
1255        self.call(
1256            Methods::GET_LICENSE_INFO,
1257            serde_json::json!({
1258                "cortexToken": cortex_token,
1259            }),
1260        )
1261        .await
1262    }
1263
1264    // ─── Headset Management ─────────────────────────────────────────────
1265
1266    /// Query available headsets.
1267    ///
1268    /// # Errors
1269    /// Returns any error produced by the underlying Cortex API call,
1270    /// including connection, authentication, protocol, timeout, and configuration errors.
1271    pub async fn query_headsets(
1272        &self,
1273        options: QueryHeadsetsOptions,
1274    ) -> CortexResult<Vec<HeadsetInfo>> {
1275        let result = self
1276            .call(
1277                Methods::QUERY_HEADSETS,
1278                Self::query_headsets_params(options),
1279            )
1280            .await?;
1281
1282        let headsets: Vec<HeadsetInfo> =
1283            serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1284                reason: format!("Failed to parse headset list: {e}"),
1285            })?;
1286
1287        tracing::info!(count = headsets.len(), "Queried headsets");
1288
1289        Ok(headsets)
1290    }
1291
1292    /// Connect to a specific headset via the Cortex service.
1293    ///
1294    /// # Errors
1295    /// Returns any error produced by the underlying Cortex API call,
1296    /// including connection, authentication, protocol, timeout, and configuration errors.
1297    pub async fn connect_headset(&self, headset_id: &str) -> CortexResult<()> {
1298        self.call(
1299            Methods::CONTROL_DEVICE,
1300            serde_json::json!({
1301                "command": "connect",
1302                "headset": headset_id,
1303            }),
1304        )
1305        .await?;
1306
1307        tracing::info!(headset = headset_id, "Headset connection initiated");
1308        Ok(())
1309    }
1310
1311    /// Disconnect a headset from the Cortex service.
1312    ///
1313    /// # Errors
1314    /// Returns any error produced by the underlying Cortex API call,
1315    /// including connection, authentication, protocol, timeout, and configuration errors.
1316    pub async fn disconnect_headset(&self, headset_id: &str) -> CortexResult<()> {
1317        self.call(
1318            Methods::CONTROL_DEVICE,
1319            serde_json::json!({
1320                "command": "disconnect",
1321                "headset": headset_id,
1322            }),
1323        )
1324        .await?;
1325
1326        tracing::info!(headset = headset_id, "Headset disconnection initiated");
1327        Ok(())
1328    }
1329
1330    /// Trigger headset scanning / refresh.
1331    ///
1332    /// # Errors
1333    /// Returns any error produced by the underlying Cortex API call,
1334    /// including connection, authentication, protocol, timeout, and configuration errors.
1335    pub async fn refresh_headsets(&self) -> CortexResult<()> {
1336        self.call(
1337            Methods::CONTROL_DEVICE,
1338            serde_json::json!({
1339                "command": "refresh",
1340            }),
1341        )
1342        .await?;
1343
1344        tracing::debug!("Headset refresh/scan triggered");
1345        Ok(())
1346    }
1347
1348    /// Synchronize the system clock with the headset clock.
1349    ///
1350    /// Cortex method: `syncWithHeadsetClock`
1351    /// Required state: reachable headset.
1352    /// Parameters: `headset_id`.
1353    /// Returns: typed clock sync details from Cortex.
1354    /// Errors: session/headset/transport errors from Cortex are propagated.
1355    /// Related methods: [`Self::query_headsets`], [`Self::connect_headset`].
1356    ///
1357    /// # Errors
1358    /// Returns any error produced by the underlying Cortex API call,
1359    /// including connection, authentication, protocol, timeout, and configuration errors.
1360    pub async fn sync_with_headset_clock(
1361        &self,
1362        headset_id: &str,
1363    ) -> CortexResult<HeadsetClockSyncResult> {
1364        let result = self
1365            .call(
1366                Methods::SYNC_WITH_HEADSET_CLOCK,
1367                self.sync_with_headset_clock_params(headset_id)?,
1368            )
1369            .await?;
1370
1371        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1372            reason: format!("Failed to parse headset clock sync result: {e}"),
1373        })
1374    }
1375
1376    /// Manage EEG channel mapping configurations for an EPOC Flex headset.
1377    ///
1378    /// # Errors
1379    /// Returns any error produced by the underlying Cortex API call,
1380    /// including connection, authentication, protocol, timeout, and configuration errors.
1381    pub async fn config_mapping(
1382        &self,
1383        cortex_token: &str,
1384        request: ConfigMappingRequest,
1385    ) -> CortexResult<ConfigMappingResponse> {
1386        let (mode, params) = Self::config_mapping_params(cortex_token, request)?;
1387        let result = self.call(Methods::CONFIG_MAPPING, params).await?;
1388
1389        match mode {
1390            ConfigMappingMode::Create | ConfigMappingMode::Read | ConfigMappingMode::Update => {
1391                #[derive(serde::Deserialize)]
1392                struct ValueEnvelope {
1393                    message: String,
1394                    value: ConfigMappingValue,
1395                }
1396                let parsed: ValueEnvelope =
1397                    serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1398                        reason: format!("Failed to parse configMapping value response: {e}"),
1399                    })?;
1400                Ok(ConfigMappingResponse::Value {
1401                    message: parsed.message,
1402                    value: parsed.value,
1403                })
1404            }
1405            ConfigMappingMode::Get => {
1406                #[derive(serde::Deserialize)]
1407                struct ListEnvelope {
1408                    message: String,
1409                    value: ConfigMappingListValue,
1410                }
1411                let parsed: ListEnvelope =
1412                    serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1413                        reason: format!("Failed to parse configMapping get response: {e}"),
1414                    })?;
1415                Ok(ConfigMappingResponse::List {
1416                    message: parsed.message,
1417                    value: parsed.value,
1418                })
1419            }
1420            ConfigMappingMode::Delete => {
1421                #[derive(serde::Deserialize)]
1422                struct DeleteEnvelope {
1423                    message: String,
1424                    uuid: String,
1425                }
1426                let parsed: DeleteEnvelope =
1427                    serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1428                        reason: format!("Failed to parse configMapping delete response: {e}"),
1429                    })?;
1430                Ok(ConfigMappingResponse::Deleted {
1431                    message: parsed.message,
1432                    uuid: parsed.uuid,
1433                })
1434            }
1435        }
1436    }
1437
1438    /// Update settings of an EPOC+ or EPOC X headset.
1439    ///
1440    /// Cortex method: `updateHeadset`
1441    /// Required state: authenticated token.
1442    /// Parameters:
1443    ///
1444    /// - `headset_id`: headset identifier
1445    /// - `setting`: device-specific JSON object (for example:
1446    ///   `{"mode": "EPOC", "eegRate": 256, "memsRate": 64}`)
1447    ///
1448    /// Returns: raw JSON-RPC result payload from Cortex.
1449    /// Errors: validation/headset/license/auth errors are propagated.
1450    /// Retry/idempotency: safe to retry when the same `setting` is reused.
1451    /// Related methods: [`Self::update_headset_custom_info`], [`Self::query_headsets`].
1452    ///
1453    /// # Errors
1454    /// Returns any error produced by the underlying Cortex API call,
1455    /// including connection, authentication, protocol, timeout, and configuration errors.
1456    pub async fn update_headset(
1457        &self,
1458        cortex_token: &str,
1459        headset_id: &str,
1460        setting: serde_json::Value,
1461    ) -> CortexResult<serde_json::Value> {
1462        self.call(
1463            Methods::UPDATE_HEADSET,
1464            serde_json::json!({
1465                "cortexToken": cortex_token,
1466                "headset": headset_id,
1467                "setting": setting,
1468            }),
1469        )
1470        .await
1471    }
1472
1473    /// Update the headband position or custom name of an EPOC X headset.
1474    ///
1475    /// Cortex method: `updateHeadsetCustomInfo`
1476    /// Required state: authenticated token.
1477    /// Parameters:
1478    ///
1479    /// - `headset_id`: headset identifier
1480    /// - `headband_position`: optional position string
1481    /// - `custom_name`: optional display name
1482    ///
1483    /// Returns: raw JSON-RPC result payload from Cortex.
1484    /// Errors: validation/headset/auth errors are propagated.
1485    /// Related methods: [`Self::update_headset`], [`Self::query_headsets`].
1486    ///
1487    /// # Errors
1488    /// Returns any error produced by the underlying Cortex API call,
1489    /// including connection, authentication, protocol, timeout, and configuration errors.
1490    pub async fn update_headset_custom_info(
1491        &self,
1492        cortex_token: &str,
1493        headset_id: &str,
1494        headband_position: Option<&str>,
1495        custom_name: Option<&str>,
1496    ) -> CortexResult<serde_json::Value> {
1497        self.call(
1498            Methods::UPDATE_HEADSET_CUSTOM_INFO,
1499            Self::update_headset_custom_info_params(
1500                cortex_token,
1501                headset_id,
1502                headband_position,
1503                custom_name,
1504            ),
1505        )
1506        .await
1507    }
1508
1509    // ─── Session Management ─────────────────────────────────────────────
1510
1511    /// Create a session for a headset.
1512    ///
1513    /// # Errors
1514    /// Returns any error produced by the underlying Cortex API call,
1515    /// including connection, authentication, protocol, timeout, and configuration errors.
1516    pub async fn create_session(
1517        &self,
1518        cortex_token: &str,
1519        headset_id: &str,
1520    ) -> CortexResult<SessionInfo> {
1521        let result = self
1522            .call(
1523                Methods::CREATE_SESSION,
1524                serde_json::json!({
1525                    "cortexToken": cortex_token,
1526                    "headset": headset_id,
1527                    "status": "active",
1528                }),
1529            )
1530            .await?;
1531
1532        let session: SessionInfo =
1533            serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1534                reason: format!("Failed to parse session info: {e}"),
1535            })?;
1536
1537        tracing::info!(session_id = %session.id, "Session created");
1538        Ok(session)
1539    }
1540
1541    /// Query existing sessions.
1542    ///
1543    /// # Errors
1544    /// Returns any error produced by the underlying Cortex API call,
1545    /// including connection, authentication, protocol, timeout, and configuration errors.
1546    pub async fn query_sessions(&self, cortex_token: &str) -> CortexResult<Vec<SessionInfo>> {
1547        let result = self
1548            .call(
1549                Methods::QUERY_SESSIONS,
1550                serde_json::json!({
1551                    "cortexToken": cortex_token,
1552                }),
1553            )
1554            .await?;
1555
1556        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1557            reason: format!("Failed to parse sessions: {e}"),
1558        })
1559    }
1560
1561    /// Close an active session.
1562    ///
1563    /// Cortex method: `updateSession` with `status = "close"`.
1564    /// Required state: authenticated token and a valid `session_id`.
1565    /// Returns: `Ok(())` only when Cortex confirms the session update call.
1566    /// Errors: session/auth/transport errors are propagated.
1567    /// Retry/idempotency: generally safe to retry close on transient failures.
1568    /// Related methods: [`Self::create_session`], [`Self::query_sessions`].
1569    ///
1570    /// # Errors
1571    /// Returns any error produced by the underlying Cortex API call,
1572    /// including connection, authentication, protocol, timeout, and configuration errors.
1573    pub async fn close_session(&self, cortex_token: &str, session_id: &str) -> CortexResult<()> {
1574        self.call(
1575            Methods::UPDATE_SESSION,
1576            serde_json::json!({
1577                "cortexToken": cortex_token,
1578                "session": session_id,
1579                "status": "close",
1580            }),
1581        )
1582        .await?;
1583
1584        tracing::info!(session_id, "Session closed");
1585        Ok(())
1586    }
1587
1588    // ─── Data Streams ───────────────────────────────────────────────────
1589
1590    /// Subscribe to one or more data streams.
1591    ///
1592    /// # Errors
1593    /// Returns any error produced by the underlying Cortex API call,
1594    /// including connection, authentication, protocol, timeout, and configuration errors.
1595    pub async fn subscribe_streams(
1596        &self,
1597        cortex_token: &str,
1598        session_id: &str,
1599        streams: &[&str],
1600    ) -> CortexResult<serde_json::Value> {
1601        let resp = self
1602            .call(
1603                Methods::SUBSCRIBE,
1604                serde_json::json!({
1605                    "cortexToken": cortex_token,
1606                    "session": session_id,
1607                    "streams": streams,
1608                }),
1609            )
1610            .await?;
1611
1612        tracing::info!(session_id, ?streams, "Subscribed to data streams");
1613        Ok(resp)
1614    }
1615
1616    /// Unsubscribe from one or more data streams.
1617    ///
1618    /// # Errors
1619    /// Returns any error produced by the underlying Cortex API call,
1620    /// including connection, authentication, protocol, timeout, and configuration errors.
1621    pub async fn unsubscribe_streams(
1622        &self,
1623        cortex_token: &str,
1624        session_id: &str,
1625        streams: &[&str],
1626    ) -> CortexResult<()> {
1627        self.call(
1628            Methods::UNSUBSCRIBE,
1629            serde_json::json!({
1630                "cortexToken": cortex_token,
1631                "session": session_id,
1632                "streams": streams,
1633            }),
1634        )
1635        .await?;
1636
1637        tracing::info!(session_id, ?streams, "Unsubscribed from data streams");
1638        Ok(())
1639    }
1640
1641    // ─── Records ────────────────────────────────────────────────────────
1642
1643    /// Start a new recording.
1644    ///
1645    /// # Errors
1646    /// Returns any error produced by the underlying Cortex API call,
1647    /// including connection, authentication, protocol, timeout, and configuration errors.
1648    pub async fn create_record(
1649        &self,
1650        cortex_token: &str,
1651        session_id: &str,
1652        title: &str,
1653    ) -> CortexResult<RecordInfo> {
1654        let result = self
1655            .call(
1656                Methods::CREATE_RECORD,
1657                serde_json::json!({
1658                    "cortexToken": cortex_token,
1659                    "session": session_id,
1660                    "title": title,
1661                }),
1662            )
1663            .await?;
1664
1665        let record_value =
1666            result
1667                .get("record")
1668                .cloned()
1669                .ok_or_else(|| CortexError::ProtocolError {
1670                    reason: "createRecord response missing 'record' field".into(),
1671                })?;
1672
1673        let record: RecordInfo =
1674            serde_json::from_value(record_value).map_err(|e| CortexError::ProtocolError {
1675                reason: format!("Failed to parse record info: {e}"),
1676            })?;
1677
1678        tracing::info!(record_id = %record.uuid, "Recording started");
1679        Ok(record)
1680    }
1681
1682    /// Stop an active recording.
1683    ///
1684    /// # Errors
1685    /// Returns any error produced by the underlying Cortex API call,
1686    /// including connection, authentication, protocol, timeout, and configuration errors.
1687    pub async fn stop_record(
1688        &self,
1689        cortex_token: &str,
1690        session_id: &str,
1691    ) -> CortexResult<RecordInfo> {
1692        let result = self
1693            .call(
1694                Methods::STOP_RECORD,
1695                serde_json::json!({
1696                    "cortexToken": cortex_token,
1697                    "session": session_id,
1698                }),
1699            )
1700            .await?;
1701
1702        let record_value =
1703            result
1704                .get("record")
1705                .cloned()
1706                .ok_or_else(|| CortexError::ProtocolError {
1707                    reason: "stopRecord response missing 'record' field".into(),
1708                })?;
1709
1710        let record: RecordInfo =
1711            serde_json::from_value(record_value).map_err(|e| CortexError::ProtocolError {
1712                reason: format!("Failed to parse record info: {e}"),
1713            })?;
1714
1715        tracing::info!(record_id = %record.uuid, "Recording stopped");
1716        Ok(record)
1717    }
1718
1719    /// Query recorded sessions.
1720    ///
1721    /// # Errors
1722    /// Returns any error produced by the underlying Cortex API call,
1723    /// including connection, authentication, protocol, timeout, and configuration errors.
1724    pub async fn query_records(
1725        &self,
1726        cortex_token: &str,
1727        limit: Option<u32>,
1728        offset: Option<u32>,
1729    ) -> CortexResult<Vec<RecordInfo>> {
1730        let mut params = serde_json::json!({
1731            "cortexToken": cortex_token,
1732            "query": {},
1733            "orderBy": [{ "startDatetime": "DESC" }],
1734        });
1735
1736        if let Some(limit) = limit {
1737            params["limit"] = serde_json::json!(limit);
1738        }
1739        if let Some(offset) = offset {
1740            params["offset"] = serde_json::json!(offset);
1741        }
1742
1743        let result = self.call(Methods::QUERY_RECORDS, params).await?;
1744
1745        let records = result
1746            .get("records")
1747            .cloned()
1748            .unwrap_or(serde_json::Value::Array(vec![]));
1749
1750        serde_json::from_value(records).map_err(|e| CortexError::ProtocolError {
1751            reason: format!("Failed to parse records: {e}"),
1752        })
1753    }
1754
1755    /// Export a recording to CSV or EDF format.
1756    ///
1757    /// # Errors
1758    /// Returns any error produced by the underlying Cortex API call,
1759    /// including connection, authentication, protocol, timeout, and configuration errors.
1760    pub async fn export_record(
1761        &self,
1762        cortex_token: &str,
1763        record_ids: &[String],
1764        folder: &str,
1765        format: ExportFormat,
1766    ) -> CortexResult<()> {
1767        self.call(
1768            Methods::EXPORT_RECORD,
1769            serde_json::json!({
1770                "cortexToken": cortex_token,
1771                "recordIds": record_ids,
1772                "folder": folder,
1773                "format": format.as_str(),
1774            }),
1775        )
1776        .await?;
1777
1778        tracing::info!(
1779            ?record_ids,
1780            folder,
1781            format = format.as_str(),
1782            "Export initiated"
1783        );
1784        Ok(())
1785    }
1786
1787    /// Update a recording's metadata (title, description, tags).
1788    ///
1789    /// # Errors
1790    /// Returns any error produced by the underlying Cortex API call,
1791    /// including connection, authentication, protocol, timeout, and configuration errors.
1792    pub async fn update_record_with(
1793        &self,
1794        cortex_token: &str,
1795        request: &UpdateRecordRequest,
1796    ) -> CortexResult<RecordInfo> {
1797        let mut params = serde_json::json!({
1798            "cortexToken": cortex_token,
1799            "record": request.record_id.as_str(),
1800        });
1801
1802        if let Some(t) = &request.title {
1803            params["title"] = serde_json::json!(t);
1804        }
1805        if let Some(d) = &request.description {
1806            params["description"] = serde_json::json!(d);
1807        }
1808        if let Some(t) = &request.tags {
1809            params["tags"] = serde_json::json!(t);
1810        }
1811
1812        let result = self.call(Methods::UPDATE_RECORD, params).await?;
1813
1814        let record_value =
1815            result
1816                .get("record")
1817                .cloned()
1818                .ok_or_else(|| CortexError::ProtocolError {
1819                    reason: "updateRecord response missing 'record' field".into(),
1820                })?;
1821
1822        serde_json::from_value(record_value).map_err(|e| CortexError::ProtocolError {
1823            reason: format!("Failed to parse record info: {e}"),
1824        })
1825    }
1826
1827    /// Update a recording's metadata (title, description, tags).
1828    #[deprecated(note = "Use `update_record_with` and `UpdateRecordRequest` instead.")]
1829    ///
1830    /// # Errors
1831    /// Returns any error produced by the underlying Cortex API call,
1832    /// including connection, authentication, protocol, timeout, and configuration errors.
1833    pub async fn update_record(
1834        &self,
1835        cortex_token: &str,
1836        record_id: &str,
1837        title: Option<&str>,
1838        description: Option<&str>,
1839        tags: Option<&[String]>,
1840    ) -> CortexResult<RecordInfo> {
1841        let request = UpdateRecordRequest {
1842            record_id: record_id.to_string(),
1843            title: title.map(ToString::to_string),
1844            description: description.map(ToString::to_string),
1845            tags: tags.map(<[std::string::String]>::to_vec),
1846        };
1847        self.update_record_with(cortex_token, &request).await
1848    }
1849
1850    /// Delete one or more recordings.
1851    ///
1852    /// # Errors
1853    /// Returns any error produced by the underlying Cortex API call,
1854    /// including connection, authentication, protocol, timeout, and configuration errors.
1855    pub async fn delete_record(
1856        &self,
1857        cortex_token: &str,
1858        record_ids: &[String],
1859    ) -> CortexResult<serde_json::Value> {
1860        self.call(
1861            Methods::DELETE_RECORD,
1862            serde_json::json!({
1863                "cortexToken": cortex_token,
1864                "records": record_ids,
1865            }),
1866        )
1867        .await
1868    }
1869
1870    /// Get detailed information for specific records by their IDs.
1871    ///
1872    /// # Errors
1873    /// Returns any error produced by the underlying Cortex API call,
1874    /// including connection, authentication, protocol, timeout, and configuration errors.
1875    pub async fn get_record_infos(
1876        &self,
1877        cortex_token: &str,
1878        record_ids: &[String],
1879    ) -> CortexResult<serde_json::Value> {
1880        self.call(
1881            Methods::GET_RECORD_INFOS,
1882            serde_json::json!({
1883                "cortexToken": cortex_token,
1884                "recordIds": record_ids,
1885            }),
1886        )
1887        .await
1888    }
1889
1890    /// Configure the opt-out setting for data sharing.
1891    ///
1892    /// Use `status: "get"` to query, `status: "set"` with `new_opt_out` to change.
1893    ///
1894    /// # Errors
1895    /// Returns any error produced by the underlying Cortex API call,
1896    /// including connection, authentication, protocol, timeout, and configuration errors.
1897    pub async fn config_opt_out(
1898        &self,
1899        cortex_token: &str,
1900        status: &str,
1901        new_opt_out: Option<bool>,
1902    ) -> CortexResult<serde_json::Value> {
1903        let mut params = serde_json::json!({
1904            "cortexToken": cortex_token,
1905            "status": status,
1906        });
1907
1908        if let Some(opt) = new_opt_out {
1909            params["newOptOut"] = serde_json::json!(opt);
1910        }
1911
1912        self.call(Methods::CONFIG_OPT_OUT, params).await
1913    }
1914
1915    /// Request to download recorded data from the Emotiv cloud.
1916    ///
1917    /// # Errors
1918    /// Returns any error produced by the underlying Cortex API call,
1919    /// including connection, authentication, protocol, timeout, and configuration errors.
1920    pub async fn download_record(
1921        &self,
1922        cortex_token: &str,
1923        record_ids: &[String],
1924    ) -> CortexResult<serde_json::Value> {
1925        self.call(
1926            Methods::DOWNLOAD_RECORD,
1927            serde_json::json!({
1928                "cortexToken": cortex_token,
1929                "recordIds": record_ids,
1930            }),
1931        )
1932        .await
1933    }
1934
1935    // ─── Markers ────────────────────────────────────────────────────────
1936
1937    /// Inject a time-stamped marker during an active recording.
1938    ///
1939    /// # Errors
1940    /// Returns any error produced by the underlying Cortex API call,
1941    /// including connection, authentication, protocol, timeout, and configuration errors.
1942    pub async fn inject_marker(
1943        &self,
1944        cortex_token: &str,
1945        session_id: &str,
1946        label: &str,
1947        value: i32,
1948        port: &str,
1949        time: Option<f64>,
1950    ) -> CortexResult<MarkerInfo> {
1951        let epoch_ms = match time {
1952            Some(value) => value,
1953            None => Self::current_epoch_millis()?
1954                .to_string()
1955                .parse::<f64>()
1956                .map_err(|e| CortexError::ProtocolError {
1957                    reason: format!("Failed to convert epoch milliseconds to f64: {e}"),
1958                })?,
1959        };
1960
1961        let params = serde_json::json!({
1962            "cortexToken": cortex_token,
1963            "session": session_id,
1964            "label": label,
1965            "value": value,
1966            "port": port,
1967            "time": epoch_ms,
1968        });
1969
1970        let result = self.call(Methods::INJECT_MARKER, params).await?;
1971
1972        let marker_value =
1973            result
1974                .get("marker")
1975                .cloned()
1976                .ok_or_else(|| CortexError::ProtocolError {
1977                    reason: "injectMarker response missing 'marker' field".into(),
1978                })?;
1979
1980        let marker: MarkerInfo =
1981            serde_json::from_value(marker_value).map_err(|e| CortexError::ProtocolError {
1982                reason: format!("Failed to parse marker info: {e}"),
1983            })?;
1984
1985        tracing::debug!(marker_id = %marker.uuid, label, "Marker injected");
1986        Ok(marker)
1987    }
1988
1989    /// Update a marker to convert it from an instance marker to an interval marker.
1990    ///
1991    /// # Errors
1992    /// Returns any error produced by the underlying Cortex API call,
1993    /// including connection, authentication, protocol, timeout, and configuration errors.
1994    pub async fn update_marker(
1995        &self,
1996        cortex_token: &str,
1997        session_id: &str,
1998        marker_id: &str,
1999        time: Option<f64>,
2000    ) -> CortexResult<()> {
2001        let mut params = serde_json::json!({
2002            "cortexToken": cortex_token,
2003            "session": session_id,
2004            "markerId": marker_id,
2005        });
2006
2007        if let Some(t) = time {
2008            params["time"] = serde_json::json!(t);
2009        }
2010
2011        self.call(Methods::UPDATE_MARKER, params).await?;
2012        tracing::debug!(marker_id, "Marker updated");
2013        Ok(())
2014    }
2015
2016    // ─── Subjects ────────────────────────────────────────────────────────
2017
2018    /// Create a new subject.
2019    ///
2020    /// # Errors
2021    /// Returns any error produced by the underlying Cortex API call,
2022    /// including connection, authentication, protocol, timeout, and configuration errors.
2023    pub async fn create_subject_with(
2024        &self,
2025        cortex_token: &str,
2026        request: &SubjectRequest,
2027    ) -> CortexResult<SubjectInfo> {
2028        let result = self
2029            .call(
2030                Methods::CREATE_SUBJECT,
2031                Self::subject_params(cortex_token, request),
2032            )
2033            .await?;
2034
2035        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2036            reason: format!("Failed to parse subject info: {e}"),
2037        })
2038    }
2039
2040    /// Create a new subject.
2041    #[deprecated(note = "Use `create_subject_with` and `SubjectRequest` instead.")]
2042    #[allow(clippy::too_many_arguments)]
2043    ///
2044    /// # Errors
2045    /// Returns any error produced by the underlying Cortex API call,
2046    /// including connection, authentication, protocol, timeout, and configuration errors.
2047    pub async fn create_subject(
2048        &self,
2049        cortex_token: &str,
2050        subject_name: &str,
2051        date_of_birth: Option<&str>,
2052        sex: Option<&str>,
2053        country_code: Option<&str>,
2054        state: Option<&str>,
2055        city: Option<&str>,
2056        attributes: Option<&[serde_json::Value]>,
2057    ) -> CortexResult<SubjectInfo> {
2058        let request = SubjectRequest {
2059            subject_name: subject_name.to_string(),
2060            date_of_birth: date_of_birth.map(ToString::to_string),
2061            sex: sex.map(ToString::to_string),
2062            country_code: country_code.map(ToString::to_string),
2063            state: state.map(ToString::to_string),
2064            city: city.map(ToString::to_string),
2065            attributes: attributes.map(<[serde_json::Value]>::to_vec),
2066        };
2067        self.create_subject_with(cortex_token, &request).await
2068    }
2069
2070    /// Update an existing subject's information.
2071    ///
2072    /// # Errors
2073    /// Returns any error produced by the underlying Cortex API call,
2074    /// including connection, authentication, protocol, timeout, and configuration errors.
2075    pub async fn update_subject_with(
2076        &self,
2077        cortex_token: &str,
2078        request: &SubjectRequest,
2079    ) -> CortexResult<SubjectInfo> {
2080        let result = self
2081            .call(
2082                Methods::UPDATE_SUBJECT,
2083                Self::subject_params(cortex_token, request),
2084            )
2085            .await?;
2086
2087        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2088            reason: format!("Failed to parse subject info: {e}"),
2089        })
2090    }
2091
2092    /// Update an existing subject's information.
2093    #[deprecated(note = "Use `update_subject_with` and `SubjectRequest` instead.")]
2094    #[allow(clippy::too_many_arguments)]
2095    ///
2096    /// # Errors
2097    /// Returns any error produced by the underlying Cortex API call,
2098    /// including connection, authentication, protocol, timeout, and configuration errors.
2099    pub async fn update_subject(
2100        &self,
2101        cortex_token: &str,
2102        subject_name: &str,
2103        date_of_birth: Option<&str>,
2104        sex: Option<&str>,
2105        country_code: Option<&str>,
2106        state: Option<&str>,
2107        city: Option<&str>,
2108        attributes: Option<&[serde_json::Value]>,
2109    ) -> CortexResult<SubjectInfo> {
2110        let request = SubjectRequest {
2111            subject_name: subject_name.to_string(),
2112            date_of_birth: date_of_birth.map(ToString::to_string),
2113            sex: sex.map(ToString::to_string),
2114            country_code: country_code.map(ToString::to_string),
2115            state: state.map(ToString::to_string),
2116            city: city.map(ToString::to_string),
2117            attributes: attributes.map(<[serde_json::Value]>::to_vec),
2118        };
2119        self.update_subject_with(cortex_token, &request).await
2120    }
2121
2122    /// Delete one or more subjects.
2123    ///
2124    /// # Errors
2125    /// Returns any error produced by the underlying Cortex API call,
2126    /// including connection, authentication, protocol, timeout, and configuration errors.
2127    pub async fn delete_subjects(
2128        &self,
2129        cortex_token: &str,
2130        subject_names: &[String],
2131    ) -> CortexResult<serde_json::Value> {
2132        self.call(
2133            Methods::DELETE_SUBJECTS,
2134            serde_json::json!({
2135                "cortexToken": cortex_token,
2136                "subjects": subject_names,
2137            }),
2138        )
2139        .await
2140    }
2141
2142    /// Query subjects with filtering, sorting, and pagination.
2143    ///
2144    /// Returns a tuple of (subjects, `total_count`).
2145    ///
2146    /// # Errors
2147    /// Returns any error produced by the underlying Cortex API call,
2148    /// including connection, authentication, protocol, timeout, and configuration errors.
2149    pub async fn query_subjects_with(
2150        &self,
2151        cortex_token: &str,
2152        request: &QuerySubjectsRequest,
2153    ) -> CortexResult<(Vec<SubjectInfo>, u32)> {
2154        let result = self
2155            .call(
2156                Methods::QUERY_SUBJECTS,
2157                Self::query_subjects_params(cortex_token, request),
2158            )
2159            .await?;
2160
2161        let count = result
2162            .get("count")
2163            .and_then(serde_json::Value::as_u64)
2164            .and_then(|value| u32::try_from(value).ok())
2165            .unwrap_or(0);
2166
2167        let subjects_value = result
2168            .get("subjects")
2169            .cloned()
2170            .unwrap_or(serde_json::Value::Array(vec![]));
2171
2172        let subjects: Vec<SubjectInfo> =
2173            serde_json::from_value(subjects_value).map_err(|e| CortexError::ProtocolError {
2174                reason: format!("Failed to parse subjects: {e}"),
2175            })?;
2176
2177        Ok((subjects, count))
2178    }
2179
2180    /// Query subjects with filtering, sorting, and pagination.
2181    ///
2182    /// Returns a tuple of (subjects, `total_count`).
2183    #[deprecated(note = "Use `query_subjects_with` and `QuerySubjectsRequest` instead.")]
2184    ///
2185    /// # Errors
2186    /// Returns any error produced by the underlying Cortex API call,
2187    /// including connection, authentication, protocol, timeout, and configuration errors.
2188    pub async fn query_subjects(
2189        &self,
2190        cortex_token: &str,
2191        query: serde_json::Value,
2192        order_by: serde_json::Value,
2193        limit: Option<u32>,
2194        offset: Option<u32>,
2195    ) -> CortexResult<(Vec<SubjectInfo>, u32)> {
2196        let request = QuerySubjectsRequest {
2197            query,
2198            order_by,
2199            limit,
2200            offset,
2201        };
2202        self.query_subjects_with(cortex_token, &request).await
2203    }
2204
2205    /// Get the list of valid demographic attributes.
2206    ///
2207    /// # Errors
2208    /// Returns any error produced by the underlying Cortex API call,
2209    /// including connection, authentication, protocol, timeout, and configuration errors.
2210    pub async fn get_demographic_attributes(
2211        &self,
2212        cortex_token: &str,
2213    ) -> CortexResult<Vec<DemographicAttribute>> {
2214        let result = self
2215            .call(
2216                Methods::GET_DEMOGRAPHIC_ATTRIBUTES,
2217                serde_json::json!({
2218                    "cortexToken": cortex_token,
2219                }),
2220            )
2221            .await?;
2222
2223        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2224            reason: format!("Failed to parse demographic attributes: {e}"),
2225        })
2226    }
2227
2228    // ─── Profiles ───────────────────────────────────────────────────────
2229
2230    /// List all profiles for the current user.
2231    ///
2232    /// # Errors
2233    /// Returns any error produced by the underlying Cortex API call,
2234    /// including connection, authentication, protocol, timeout, and configuration errors.
2235    pub async fn query_profiles(&self, cortex_token: &str) -> CortexResult<Vec<ProfileInfo>> {
2236        let result = self
2237            .call(
2238                Methods::QUERY_PROFILE,
2239                serde_json::json!({
2240                    "cortexToken": cortex_token,
2241                }),
2242            )
2243            .await?;
2244
2245        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2246            reason: format!("Failed to parse profiles: {e}"),
2247        })
2248    }
2249
2250    /// Get the profile currently loaded for a headset.
2251    ///
2252    ///
2253    /// # Errors
2254    /// Returns any error produced by the underlying Cortex API call,
2255    /// including connection, authentication, protocol, timeout, and configuration errors.
2256    pub async fn get_current_profile(
2257        &self,
2258        cortex_token: &str,
2259        headset_id: &str,
2260    ) -> CortexResult<CurrentProfileInfo> {
2261        let result = self
2262            .call(
2263                Methods::GET_CURRENT_PROFILE,
2264                serde_json::json!({
2265                    "cortexToken": cortex_token,
2266                    "headset": headset_id,
2267                }),
2268            )
2269            .await?;
2270
2271        let profile: CurrentProfileInfo =
2272            serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2273                reason: format!("Failed to parse current profile info: {e}"),
2274            })?;
2275
2276        Ok(profile)
2277    }
2278
2279    /// Manage a profile (create, load, unload, save, rename, delete).
2280    ///
2281    /// # Errors
2282    /// Returns any error produced by the underlying Cortex API call,
2283    /// including connection, authentication, protocol, timeout, and configuration errors.
2284    pub async fn setup_profile(
2285        &self,
2286        cortex_token: &str,
2287        headset_id: &str,
2288        profile_name: &str,
2289        action: ProfileAction,
2290    ) -> CortexResult<()> {
2291        self.call(
2292            Methods::SETUP_PROFILE,
2293            serde_json::json!({
2294                "cortexToken": cortex_token,
2295                "headset": headset_id,
2296                "profile": profile_name,
2297                "status": action.as_str(),
2298            }),
2299        )
2300        .await?;
2301
2302        tracing::info!(
2303            profile = profile_name,
2304            action = action.as_str(),
2305            "Profile action completed"
2306        );
2307        Ok(())
2308    }
2309
2310    /// Load an empty guest profile for a headset.
2311    ///
2312    /// This unloads any currently loaded profile and loads a blank guest profile,
2313    /// useful for starting fresh without trained data.
2314    ///
2315    /// # Errors
2316    /// Returns any error produced by the underlying Cortex API call,
2317    /// including connection, authentication, protocol, timeout, and configuration errors.
2318    pub async fn load_guest_profile(
2319        &self,
2320        cortex_token: &str,
2321        headset_id: &str,
2322    ) -> CortexResult<()> {
2323        self.call(
2324            Methods::LOAD_GUEST_PROFILE,
2325            serde_json::json!({
2326                "cortexToken": cortex_token,
2327                "headset": headset_id,
2328            }),
2329        )
2330        .await?;
2331
2332        tracing::info!(headset = headset_id, "Guest profile loaded");
2333        Ok(())
2334    }
2335
2336    // ─── BCI / Training ─────────────────────────────────────────────────
2337
2338    /// Get detection info for a specific detection type.
2339    ///
2340    /// # Errors
2341    /// Returns any error produced by the underlying Cortex API call,
2342    /// including connection, authentication, protocol, timeout, and configuration errors.
2343    pub async fn get_detection_info(
2344        &self,
2345        detection: DetectionType,
2346    ) -> CortexResult<DetectionInfo> {
2347        let result = self
2348            .call(
2349                Methods::GET_DETECTION_INFO,
2350                serde_json::json!({
2351                    "detection": detection.as_str(),
2352                }),
2353            )
2354            .await?;
2355
2356        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2357            reason: format!("Failed to parse detection info: {e}"),
2358        })
2359    }
2360
2361    /// Control the training lifecycle for mental commands or facial expressions.
2362    ///
2363    /// # Errors
2364    /// Returns any error produced by the underlying Cortex API call,
2365    /// including connection, authentication, protocol, timeout, and configuration errors.
2366    pub async fn training(
2367        &self,
2368        cortex_token: &str,
2369        session_id: &str,
2370        detection: DetectionType,
2371        status: TrainingStatus,
2372        action: &str,
2373    ) -> CortexResult<serde_json::Value> {
2374        self.call(
2375            Methods::TRAINING,
2376            serde_json::json!({
2377                "cortexToken": cortex_token,
2378                "session": session_id,
2379                "detection": detection.as_str(),
2380                "status": status.as_str(),
2381                "action": action,
2382            }),
2383        )
2384        .await
2385    }
2386
2387    /// Get or set the active mental command actions.
2388    ///
2389    /// # Errors
2390    /// Returns any error produced by the underlying Cortex API call,
2391    /// including connection, authentication, protocol, timeout, and configuration errors.
2392    pub async fn mental_command_active_action(
2393        &self,
2394        cortex_token: &str,
2395        session_id: &str,
2396        actions: Option<&[&str]>,
2397    ) -> CortexResult<serde_json::Value> {
2398        let mut params = serde_json::json!({
2399            "cortexToken": cortex_token,
2400            "session": session_id,
2401            "status": if actions.is_some() { "set" } else { "get" },
2402        });
2403
2404        if let Some(actions) = actions {
2405            params["actions"] = serde_json::json!(actions);
2406        }
2407
2408        self.call(Methods::MENTAL_COMMAND_ACTIVE_ACTION, params)
2409            .await
2410    }
2411
2412    /// Get or set the mental command action sensitivity.
2413    ///
2414    /// # Errors
2415    /// Returns any error produced by the underlying Cortex API call,
2416    /// including connection, authentication, protocol, timeout, and configuration errors.
2417    pub async fn mental_command_action_sensitivity(
2418        &self,
2419        cortex_token: &str,
2420        session_id: &str,
2421        values: Option<&[i32]>,
2422    ) -> CortexResult<serde_json::Value> {
2423        let mut params = serde_json::json!({
2424            "cortexToken": cortex_token,
2425            "session": session_id,
2426            "status": if values.is_some() { "set" } else { "get" },
2427        });
2428
2429        if let Some(values) = values {
2430            params["values"] = serde_json::json!(values);
2431        }
2432
2433        self.call(Methods::MENTAL_COMMAND_ACTION_SENSITIVITY, params)
2434            .await
2435    }
2436
2437    /// Get the mental command brain map.
2438    ///
2439    /// # Errors
2440    /// Returns any error produced by the underlying Cortex API call,
2441    /// including connection, authentication, protocol, timeout, and configuration errors.
2442    pub async fn mental_command_brain_map(
2443        &self,
2444        cortex_token: &str,
2445        session_id: &str,
2446    ) -> CortexResult<serde_json::Value> {
2447        self.call(
2448            Methods::MENTAL_COMMAND_BRAIN_MAP,
2449            serde_json::json!({
2450                "cortexToken": cortex_token,
2451                "session": session_id,
2452            }),
2453        )
2454        .await
2455    }
2456
2457    /// Get the mental command training threshold for an active session.
2458    ///
2459    /// Cortex method: `mentalCommandTrainingThreshold`
2460    /// Required state: authenticated token and active session.
2461    /// Parameters: `session_id` selects the target session.
2462    /// Returns: raw JSON payload from Cortex.
2463    /// Related methods:
2464    /// [`Self::mental_command_training_threshold_with_params`],
2465    /// [`Self::mental_command_training_threshold_for_profile`].
2466    ///
2467    /// # Errors
2468    /// Returns any error produced by the underlying Cortex API call,
2469    /// including connection, authentication, protocol, timeout, and configuration errors.
2470    pub async fn mental_command_training_threshold(
2471        &self,
2472        cortex_token: &str,
2473        session_id: &str,
2474    ) -> CortexResult<serde_json::Value> {
2475        let request = MentalCommandTrainingThresholdRequest {
2476            session_id: Some(session_id.to_string()),
2477            profile: None,
2478            status: None,
2479            value: None,
2480        };
2481        self.mental_command_training_threshold_with_request(cortex_token, &request)
2482            .await
2483    }
2484
2485    /// Get or set the mental command training threshold for a profile.
2486    ///
2487    /// Set `status` to `Some("set")` and provide `value` to update.
2488    /// Use `status = None` (or `Some("get")`) to read the threshold.
2489    ///
2490    /// # Errors
2491    /// Returns any error produced by the underlying Cortex API call,
2492    /// including connection, authentication, protocol, timeout, and configuration errors.
2493    pub async fn mental_command_training_threshold_for_profile(
2494        &self,
2495        cortex_token: &str,
2496        profile: &str,
2497        status: Option<&str>,
2498        value: Option<f64>,
2499    ) -> CortexResult<serde_json::Value> {
2500        let request = MentalCommandTrainingThresholdRequest {
2501            session_id: None,
2502            profile: Some(profile.to_string()),
2503            status: status.map(ToString::to_string),
2504            value,
2505        };
2506        self.mental_command_training_threshold_with_request(cortex_token, &request)
2507            .await
2508    }
2509
2510    /// Get or set the mental command training threshold using a typed request.
2511    ///
2512    /// # Errors
2513    /// Returns any error produced by the underlying Cortex API call,
2514    /// including connection, authentication, protocol, timeout, and configuration errors.
2515    pub async fn mental_command_training_threshold_with_request(
2516        &self,
2517        cortex_token: &str,
2518        request: &MentalCommandTrainingThresholdRequest,
2519    ) -> CortexResult<serde_json::Value> {
2520        let params = Self::mental_command_training_threshold_params(
2521            cortex_token,
2522            request.session_id.as_deref(),
2523            request.profile.as_deref(),
2524            request.status.as_deref(),
2525            request.value,
2526        )?;
2527
2528        self.call(Methods::MENTAL_COMMAND_TRAINING_THRESHOLD, params)
2529            .await
2530    }
2531
2532    /// Get or set the mental command training threshold using either session
2533    /// or profile targeting.
2534    ///
2535    /// Exactly one of `session_id` or `profile` must be provided.
2536    /// If `status` is `None`, this infers `"get"` when `value` is `None`,
2537    /// otherwise `"set"`.
2538    ///
2539    /// # Errors
2540    /// Returns any error produced by the underlying Cortex API call,
2541    /// including connection, authentication, protocol, timeout, and configuration errors.
2542    #[deprecated(
2543        note = "Use `mental_command_training_threshold_with_request` and `MentalCommandTrainingThresholdRequest` instead."
2544    )]
2545    pub async fn mental_command_training_threshold_with_params(
2546        &self,
2547        cortex_token: &str,
2548        session_id: Option<&str>,
2549        profile: Option<&str>,
2550        status: Option<&str>,
2551        value: Option<f64>,
2552    ) -> CortexResult<serde_json::Value> {
2553        let request = MentalCommandTrainingThresholdRequest {
2554            session_id: session_id.map(ToString::to_string),
2555            profile: profile.map(ToString::to_string),
2556            status: status.map(ToString::to_string),
2557            value,
2558        };
2559        self.mental_command_training_threshold_with_request(cortex_token, &request)
2560            .await
2561    }
2562
2563    /// Get a list of trained actions for a profile's detection type.
2564    ///
2565    /// Specify either `profile` (by name) or `session` (by ID), not both.
2566    ///
2567    /// # Errors
2568    /// Returns any error produced by the underlying Cortex API call,
2569    /// including connection, authentication, protocol, timeout, and configuration errors.
2570    pub async fn get_trained_signature_actions(
2571        &self,
2572        cortex_token: &str,
2573        detection: DetectionType,
2574        profile: Option<&str>,
2575        session: Option<&str>,
2576    ) -> CortexResult<TrainedSignatureActions> {
2577        let mut params = serde_json::json!({
2578            "cortexToken": cortex_token,
2579            "detection": detection.as_str(),
2580        });
2581
2582        if let Some(p) = profile {
2583            params["profile"] = serde_json::json!(p);
2584        }
2585        if let Some(s) = session {
2586            params["session"] = serde_json::json!(s);
2587        }
2588
2589        let result = self
2590            .call(Methods::GET_TRAINED_SIGNATURE_ACTIONS, params)
2591            .await?;
2592
2593        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2594            reason: format!("Failed to parse trained signature actions: {e}"),
2595        })
2596    }
2597
2598    /// Get the duration of a training session.
2599    ///
2600    /// # Errors
2601    /// Returns any error produced by the underlying Cortex API call,
2602    /// including connection, authentication, protocol, timeout, and configuration errors.
2603    pub async fn get_training_time(
2604        &self,
2605        cortex_token: &str,
2606        detection: DetectionType,
2607        session_id: &str,
2608    ) -> CortexResult<TrainingTime> {
2609        let result = self
2610            .call(
2611                Methods::GET_TRAINING_TIME,
2612                serde_json::json!({
2613                    "cortexToken": cortex_token,
2614                    "detection": detection.as_str(),
2615                    "session": session_id,
2616                }),
2617            )
2618            .await?;
2619
2620        serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2621            reason: format!("Failed to parse training time: {e}"),
2622        })
2623    }
2624
2625    /// Get or set the facial expression signature type.
2626    ///
2627    /// Use `status: "get"` to query, `status: "set"` with `signature` to change.
2628    /// Specify either `profile` or `session`, not both.
2629    ///
2630    /// # Errors
2631    /// Returns any error produced by the underlying Cortex API call,
2632    /// including connection, authentication, protocol, timeout, and configuration errors.
2633    pub async fn facial_expression_signature_type_with(
2634        &self,
2635        cortex_token: &str,
2636        request: &FacialExpressionSignatureTypeRequest,
2637    ) -> CortexResult<serde_json::Value> {
2638        self.call(
2639            Methods::FACIAL_EXPRESSION_SIGNATURE_TYPE,
2640            Self::facial_expression_signature_type_params(cortex_token, request),
2641        )
2642        .await
2643    }
2644
2645    /// Get or set the facial expression signature type.
2646    ///
2647    /// Use `status: "get"` to query, `status: "set"` with `signature` to change.
2648    /// Specify either `profile` or `session`, not both.
2649    ///
2650    /// # Errors
2651    /// Returns any error produced by the underlying Cortex API call,
2652    /// including connection, authentication, protocol, timeout, and configuration errors.
2653    #[deprecated(
2654        note = "Use `facial_expression_signature_type_with` and `FacialExpressionSignatureTypeRequest` instead."
2655    )]
2656    pub async fn facial_expression_signature_type(
2657        &self,
2658        cortex_token: &str,
2659        status: &str,
2660        profile: Option<&str>,
2661        session: Option<&str>,
2662        signature: Option<&str>,
2663    ) -> CortexResult<serde_json::Value> {
2664        let request = FacialExpressionSignatureTypeRequest {
2665            status: status.to_string(),
2666            profile: profile.map(ToString::to_string),
2667            session: session.map(ToString::to_string),
2668            signature: signature.map(ToString::to_string),
2669        };
2670        self.facial_expression_signature_type_with(cortex_token, &request)
2671            .await
2672    }
2673
2674    /// Get or set the threshold of a facial expression action.
2675    ///
2676    /// Use `status: "get"` to query, `status: "set"` with `value` to change.
2677    /// Specify either `profile` or `session`, not both.
2678    /// The `value` range is 0–1000.
2679    ///
2680    /// # Errors
2681    /// Returns any error produced by the underlying Cortex API call,
2682    /// including connection, authentication, protocol, timeout, and configuration errors.
2683    pub async fn facial_expression_threshold_with(
2684        &self,
2685        cortex_token: &str,
2686        request: &FacialExpressionThresholdRequest,
2687    ) -> CortexResult<serde_json::Value> {
2688        self.call(
2689            Methods::FACIAL_EXPRESSION_THRESHOLD,
2690            Self::facial_expression_threshold_params(cortex_token, request),
2691        )
2692        .await
2693    }
2694
2695    /// Get or set the threshold of a facial expression action.
2696    ///
2697    /// Use `status: "get"` to query, `status: "set"` with `value` to change.
2698    /// Specify either `profile` or `session`, not both.
2699    /// The `value` range is 0–1000.
2700    ///
2701    /// # Errors
2702    /// Returns any error produced by the underlying Cortex API call,
2703    /// including connection, authentication, protocol, timeout, and configuration errors.
2704    #[deprecated(
2705        note = "Use `facial_expression_threshold_with` and `FacialExpressionThresholdRequest` instead."
2706    )]
2707    pub async fn facial_expression_threshold(
2708        &self,
2709        cortex_token: &str,
2710        status: &str,
2711        action: &str,
2712        profile: Option<&str>,
2713        session: Option<&str>,
2714        value: Option<u32>,
2715    ) -> CortexResult<serde_json::Value> {
2716        let request = FacialExpressionThresholdRequest {
2717            status: status.to_string(),
2718            action: action.to_string(),
2719            profile: profile.map(ToString::to_string),
2720            session: session.map(ToString::to_string),
2721            value,
2722        };
2723        self.facial_expression_threshold_with(cortex_token, &request)
2724            .await
2725    }
2726
2727    // ─── Connection Management ──────────────────────────────────────────
2728
2729    /// Returns whether the reader loop is still running.
2730    pub fn is_connected(&self) -> bool {
2731        self.reader_running.load(Ordering::SeqCst)
2732    }
2733
2734    /// Stop the reader loop.
2735    pub async fn stop_reader(&mut self) {
2736        self.reader_running.store(false, Ordering::SeqCst);
2737        let _ = self.reader_shutdown.send(true);
2738        if let Some(handle) = self.reader_handle.take() {
2739            let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
2740        }
2741    }
2742
2743    /// Close the WebSocket connection.
2744    ///
2745    /// # Errors
2746    /// Returns any error produced by the underlying Cortex API call,
2747    /// including connection, authentication, protocol, timeout, and configuration errors.
2748    pub async fn disconnect(&mut self) -> CortexResult<()> {
2749        self.stop_reader().await;
2750
2751        let mut writer = self.writer.lock().await;
2752        let _ = writer.close().await;
2753
2754        Ok(())
2755    }
2756}
2757
2758#[cfg(test)]
2759mod tests {
2760    use super::*;
2761
2762    #[test]
2763    fn test_query_headsets_params_default_is_empty() {
2764        let params = CortexClient::query_headsets_params(QueryHeadsetsOptions::default());
2765        assert_eq!(params, serde_json::json!({}));
2766    }
2767
2768    #[test]
2769    fn test_query_headsets_params_with_id() {
2770        let params = CortexClient::query_headsets_params(QueryHeadsetsOptions {
2771            id: Some("HS-123".into()),
2772            include_flex_mappings: false,
2773        });
2774        assert_eq!(params["id"], "HS-123");
2775        assert!(params.get("includeFlexMappings").is_none());
2776    }
2777
2778    #[test]
2779    fn test_query_headsets_params_with_include_flex_mappings() {
2780        let params = CortexClient::query_headsets_params(QueryHeadsetsOptions {
2781            id: None,
2782            include_flex_mappings: true,
2783        });
2784        assert_eq!(params["includeFlexMappings"], true);
2785        assert!(params.get("id").is_none());
2786    }
2787
2788    #[test]
2789    fn test_query_headsets_params_with_both_options() {
2790        let params = CortexClient::query_headsets_params(QueryHeadsetsOptions {
2791            id: Some("HS-123".into()),
2792            include_flex_mappings: true,
2793        });
2794        assert_eq!(params["id"], "HS-123");
2795        assert_eq!(params["includeFlexMappings"], true);
2796    }
2797
2798    #[test]
2799    fn test_sync_with_headset_clock_params_use_docs_shape() {
2800        let params = CortexClient::sync_with_headset_clock_params_with_times("HS-123", 12.34, 5678);
2801        assert_eq!(params["headset"], "HS-123");
2802        assert_eq!(params["monotonicTime"], 12.34);
2803        assert_eq!(params["systemTime"], 5678);
2804        assert!(params.get("cortexToken").is_none());
2805        assert!(params.get("headsetId").is_none());
2806    }
2807
2808    #[test]
2809    fn test_update_headset_custom_info_uses_headset_id() {
2810        let params = CortexClient::update_headset_custom_info_params(
2811            "token",
2812            "HS-123",
2813            Some("front"),
2814            Some("My Headset"),
2815        );
2816        assert_eq!(params["headsetId"], "HS-123");
2817        assert_eq!(params["headset"], "HS-123");
2818        assert_eq!(params["headbandPosition"], "front");
2819        assert_eq!(params["customName"], "My Headset");
2820    }
2821
2822    #[test]
2823    fn test_update_headset_custom_info_omits_optional_fields_when_none() {
2824        let params = CortexClient::update_headset_custom_info_params("token", "HS-123", None, None);
2825        assert_eq!(params["headsetId"], "HS-123");
2826        assert_eq!(params["headset"], "HS-123");
2827        assert!(params.get("headbandPosition").is_none());
2828        assert!(params.get("customName").is_none());
2829    }
2830
2831    #[test]
2832    fn test_config_mapping_create_params_validation() {
2833        let empty_name = CortexClient::config_mapping_params(
2834            "token",
2835            ConfigMappingRequest::Create {
2836                name: "   ".into(),
2837                mappings: serde_json::json!({"CMS":"TP9"}),
2838            },
2839        );
2840        assert!(matches!(empty_name, Err(CortexError::ProtocolError { .. })));
2841
2842        let invalid = CortexClient::config_mapping_params(
2843            "token",
2844            ConfigMappingRequest::Create {
2845                name: "cfg".into(),
2846                mappings: serde_json::json!(["not-an-object"]),
2847            },
2848        );
2849        assert!(matches!(invalid, Err(CortexError::ProtocolError { .. })));
2850
2851        let valid = CortexClient::config_mapping_params(
2852            "token",
2853            ConfigMappingRequest::Create {
2854                name: "cfg".into(),
2855                mappings: serde_json::json!({"CMS":"TP9"}),
2856            },
2857        )
2858        .unwrap();
2859
2860        assert!(matches!(valid.0, ConfigMappingMode::Create));
2861        assert_eq!(valid.1["status"], "create");
2862        assert_eq!(valid.1["name"], "cfg");
2863    }
2864
2865    #[test]
2866    fn test_config_mapping_read_and_delete_require_uuid() {
2867        let read = CortexClient::config_mapping_params(
2868            "token",
2869            ConfigMappingRequest::Read {
2870                uuid: String::new(),
2871            },
2872        );
2873        assert!(matches!(read, Err(CortexError::ProtocolError { .. })));
2874
2875        let delete = CortexClient::config_mapping_params(
2876            "token",
2877            ConfigMappingRequest::Delete {
2878                uuid: String::new(),
2879            },
2880        );
2881        assert!(matches!(delete, Err(CortexError::ProtocolError { .. })));
2882    }
2883
2884    #[test]
2885    fn test_config_mapping_update_requires_name_or_mappings() {
2886        let missing = CortexClient::config_mapping_params(
2887            "token",
2888            ConfigMappingRequest::Update {
2889                uuid: "uuid-1".into(),
2890                name: None,
2891                mappings: None,
2892            },
2893        );
2894        assert!(matches!(missing, Err(CortexError::ProtocolError { .. })));
2895
2896        let valid = CortexClient::config_mapping_params(
2897            "token",
2898            ConfigMappingRequest::Update {
2899                uuid: "uuid-1".into(),
2900                name: Some("new".into()),
2901                mappings: None,
2902            },
2903        )
2904        .unwrap();
2905        assert!(matches!(valid.0, ConfigMappingMode::Update));
2906        assert_eq!(valid.1["uuid"], "uuid-1");
2907        assert_eq!(valid.1["name"], "new");
2908    }
2909
2910    #[test]
2911    fn test_config_mapping_update_validation_cases() {
2912        let empty_uuid = CortexClient::config_mapping_params(
2913            "token",
2914            ConfigMappingRequest::Update {
2915                uuid: "  ".into(),
2916                name: Some("new-name".into()),
2917                mappings: None,
2918            },
2919        );
2920        assert!(matches!(empty_uuid, Err(CortexError::ProtocolError { .. })));
2921
2922        let empty_name = CortexClient::config_mapping_params(
2923            "token",
2924            ConfigMappingRequest::Update {
2925                uuid: "uuid-1".into(),
2926                name: Some(String::new()),
2927                mappings: None,
2928            },
2929        );
2930        assert!(matches!(empty_name, Err(CortexError::ProtocolError { .. })));
2931
2932        let invalid_mappings = CortexClient::config_mapping_params(
2933            "token",
2934            ConfigMappingRequest::Update {
2935                uuid: "uuid-1".into(),
2936                name: None,
2937                mappings: Some(serde_json::json!(["bad-shape"])),
2938            },
2939        );
2940        assert!(matches!(
2941            invalid_mappings,
2942            Err(CortexError::ProtocolError { .. })
2943        ));
2944    }
2945
2946    #[test]
2947    fn test_mental_command_training_threshold_params_get_and_set_modes() {
2948        let get_params = CortexClient::mental_command_training_threshold_params(
2949            "token",
2950            Some("session-1"),
2951            None,
2952            None,
2953            None,
2954        )
2955        .unwrap();
2956        assert_eq!(get_params["status"], "get");
2957        assert_eq!(get_params["session"], "session-1");
2958        assert!(get_params.get("value").is_none());
2959
2960        let set_params = CortexClient::mental_command_training_threshold_params(
2961            "token",
2962            None,
2963            Some("profile-a"),
2964            Some("set"),
2965            Some(0.42),
2966        )
2967        .unwrap();
2968        assert_eq!(set_params["status"], "set");
2969        assert_eq!(set_params["profile"], "profile-a");
2970        assert_eq!(set_params["value"], 0.42);
2971    }
2972
2973    #[test]
2974    fn test_mental_command_training_threshold_params_validation() {
2975        let both = CortexClient::mental_command_training_threshold_params(
2976            "token",
2977            Some("session-1"),
2978            Some("profile-a"),
2979            None,
2980            None,
2981        );
2982        assert!(matches!(both, Err(CortexError::ProtocolError { .. })));
2983
2984        let neither =
2985            CortexClient::mental_command_training_threshold_params("token", None, None, None, None);
2986        assert!(matches!(neither, Err(CortexError::ProtocolError { .. })));
2987    }
2988}