1use std::collections::HashMap;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
48
49use futures_util::{SinkExt, StreamExt, stream::SplitSink, stream::SplitStream};
50#[cfg(all(feature = "native-tls", not(feature = "rustls-tls")))]
51use native_tls::TlsConnector as NativeTlsConnector;
52#[cfg(feature = "rustls-tls")]
53use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
54#[cfg(feature = "rustls-tls")]
55use rustls::{DigitallySignedStruct, Error as RustlsError, SignatureScheme};
56#[cfg(feature = "rustls-tls")]
57use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
58use tokio::net::TcpStream;
59use tokio::sync::mpsc::error::TrySendError;
60use tokio::sync::{Mutex, mpsc, oneshot};
61use tokio::task::JoinHandle;
62#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
63use tokio_tungstenite::connect_async_tls_with_config;
64#[cfg(not(any(feature = "native-tls", feature = "rustls-tls")))]
65use tokio_tungstenite::tungstenite::error::UrlError;
66use tokio_tungstenite::{
67 Connector, MaybeTlsStream, WebSocketStream,
68 tungstenite::{Message, http},
69};
70
71use crate::config::CortexConfig;
72use crate::error::{CortexError, CortexResult};
73use crate::protocol::auth::UserLoginInfo;
74use crate::protocol::constants::{Methods, Streams};
75use crate::protocol::headset::{
76 ConfigMappingListValue, ConfigMappingMode, ConfigMappingRequest, ConfigMappingResponse,
77 ConfigMappingValue, HeadsetClockSyncResult, HeadsetInfo, QueryHeadsetsOptions,
78};
79use crate::protocol::profiles::{CurrentProfileInfo, ProfileAction, ProfileInfo};
80use crate::protocol::records::{ExportFormat, MarkerInfo, RecordInfo, UpdateRecordRequest};
81use crate::protocol::rpc::{CortexRequest, CortexResponse};
82use crate::protocol::session::SessionInfo;
83use crate::protocol::subjects::{
84 DemographicAttribute, QuerySubjectsRequest, SubjectInfo, SubjectRequest,
85};
86use crate::protocol::training::{
87 DetectionInfo, DetectionType, FacialExpressionSignatureTypeRequest,
88 FacialExpressionThresholdRequest, MentalCommandTrainingThresholdRequest,
89 TrainedSignatureActions, TrainingStatus, TrainingTime,
90};
91
92const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
94
95const STREAM_CHANNEL_BUFFER: usize = 1024;
97
98type ConnectOutput = Result<
99 (
100 WebSocketStream<MaybeTlsStream<TcpStream>>,
101 tokio_tungstenite::tungstenite::handshake::client::Response,
102 ),
103 tokio_tungstenite::tungstenite::Error,
104>;
105
106#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
107fn connect_websocket(
108 uri: http::Uri,
109 connector: Option<Connector>,
110) -> impl std::future::Future<Output = ConnectOutput> {
111 connect_async_tls_with_config(
112 uri, None, true, connector,
115 )
116}
117
118#[cfg(not(any(feature = "native-tls", feature = "rustls-tls")))]
119async fn connect_websocket(_uri: http::Uri, _connector: Option<Connector>) -> ConnectOutput {
120 Err(tokio_tungstenite::tungstenite::Error::Url(
121 UrlError::TlsFeatureNotEnabled,
122 ))
123}
124
125#[cfg(all(feature = "native-tls", not(feature = "rustls-tls")))]
126fn build_tls_connector(config: &CortexConfig, url: &str) -> CortexResult<Option<Connector>> {
127 let tls_connector = NativeTlsConnector::builder()
128 .danger_accept_invalid_certs(config.should_accept_invalid_certs())
129 .build()
130 .map_err(|e| CortexError::ConnectionFailed {
131 url: url.to_string(),
132 reason: format!("TLS configuration failed: {}", e),
133 })?;
134
135 Ok(Some(Connector::NativeTls(tls_connector)))
136}
137
138#[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
139fn build_tls_connector(config: &CortexConfig, url: &str) -> CortexResult<Option<Connector>> {
140 let _: http::Uri =
141 url.parse()
142 .map_err(|e: http::uri::InvalidUri| CortexError::ConnectionFailed {
143 url: url.to_string(),
144 reason: format!("Invalid URL: {e}"),
145 })?;
146
147 if !config.should_accept_invalid_certs() {
148 return Ok(None);
149 }
150
151 let tls_config = rustls::ClientConfig::builder()
152 .dangerous()
153 .with_custom_certificate_verifier(Arc::new(InsecureCertVerifier))
154 .with_no_client_auth();
155
156 Ok(Some(Connector::Rustls(Arc::new(tls_config))))
157}
158
159#[cfg(any(
160 all(feature = "native-tls", feature = "rustls-tls"),
161 all(not(feature = "native-tls"), not(feature = "rustls-tls"))
162))]
163fn build_tls_connector(_config: &CortexConfig, url: &str) -> CortexResult<Option<Connector>> {
164 let _: http::Uri =
165 url.parse()
166 .map_err(|e: http::uri::InvalidUri| CortexError::ConnectionFailed {
167 url: url.to_string(),
168 reason: format!("Invalid URL: {e}"),
169 })?;
170 Ok(None)
171}
172
173#[cfg(feature = "rustls-tls")]
174#[derive(Debug)]
175struct InsecureCertVerifier;
176
177#[cfg(feature = "rustls-tls")]
178impl ServerCertVerifier for InsecureCertVerifier {
179 fn verify_server_cert(
180 &self,
181 _end_entity: &CertificateDer<'_>,
182 _intermediates: &[CertificateDer<'_>],
183 _server_name: &ServerName<'_>,
184 _ocsp_response: &[u8],
185 _now: UnixTime,
186 ) -> Result<ServerCertVerified, RustlsError> {
187 Ok(ServerCertVerified::assertion())
188 }
189
190 fn verify_tls12_signature(
191 &self,
192 _message: &[u8],
193 _cert: &CertificateDer<'_>,
194 _dss: &DigitallySignedStruct,
195 ) -> Result<HandshakeSignatureValid, RustlsError> {
196 Ok(HandshakeSignatureValid::assertion())
197 }
198
199 fn verify_tls13_signature(
200 &self,
201 _message: &[u8],
202 _cert: &CertificateDer<'_>,
203 _dss: &DigitallySignedStruct,
204 ) -> Result<HandshakeSignatureValid, RustlsError> {
205 Ok(HandshakeSignatureValid::assertion())
206 }
207
208 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
209 vec![
210 SignatureScheme::ECDSA_NISTP384_SHA384,
211 SignatureScheme::ECDSA_NISTP256_SHA256,
212 SignatureScheme::RSA_PSS_SHA512,
213 SignatureScheme::RSA_PSS_SHA384,
214 SignatureScheme::RSA_PSS_SHA256,
215 SignatureScheme::ED25519,
216 SignatureScheme::RSA_PKCS1_SHA512,
217 SignatureScheme::RSA_PKCS1_SHA384,
218 SignatureScheme::RSA_PKCS1_SHA256,
219 ]
220 }
221}
222
223type WsWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
225
226type WsReader = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
228
229type PendingResponse = oneshot::Sender<CortexResult<serde_json::Value>>;
231
232pub type StreamSenders = HashMap<&'static str, mpsc::Sender<serde_json::Value>>;
234
235pub type StreamReceivers = HashMap<&'static str, mpsc::Receiver<serde_json::Value>>;
237
238#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
240pub struct StreamDispatchStats {
241 pub delivered: u64,
243 pub dropped_full: u64,
245 pub dropped_closed: u64,
247}
248
249#[derive(Debug, Default)]
250struct StreamDispatchCounters {
251 delivered: AtomicU64,
252 dropped_full: AtomicU64,
253 dropped_closed: AtomicU64,
254}
255
256impl StreamDispatchCounters {
257 fn snapshot(&self) -> StreamDispatchStats {
258 StreamDispatchStats {
259 delivered: self.delivered.load(Ordering::Relaxed),
260 dropped_full: self.dropped_full.load(Ordering::Relaxed),
261 dropped_closed: self.dropped_closed.load(Ordering::Relaxed),
262 }
263 }
264}
265
266type StreamDispatchCounterMap = HashMap<&'static str, Arc<StreamDispatchCounters>>;
267
268pub struct CortexClient {
278 writer: Arc<Mutex<WsWriter>>,
280
281 pending_responses: Arc<Mutex<HashMap<u64, PendingResponse>>>,
283
284 next_id: AtomicU64,
286
287 reader_handle: Option<JoinHandle<()>>,
289
290 reader_running: Arc<AtomicBool>,
292
293 reader_shutdown: tokio::sync::watch::Sender<bool>,
295
296 stream_senders: Arc<std::sync::Mutex<Option<StreamSenders>>>,
300
301 stream_dispatch_counters: Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
303
304 rpc_timeout: Duration,
306
307 clock_origin: Instant,
309}
310
311impl CortexClient {
312 pub async fn connect(config: &CortexConfig) -> CortexResult<Self> {
336 let url = &config.cortex_url;
337 let rpc_timeout = Duration::from_secs(config.timeouts.rpc_timeout_secs);
338 let connector = build_tls_connector(config, url)?;
339
340 let uri: http::Uri =
342 url.parse()
343 .map_err(|e: http::uri::InvalidUri| CortexError::ConnectionFailed {
344 url: url.clone(),
345 reason: format!("Invalid URL: {e}"),
346 })?;
347
348 let connect_fut = connect_websocket(uri, connector);
349
350 let (ws, response) = tokio::time::timeout(CONNECT_TIMEOUT, connect_fut)
351 .await
352 .map_err(|_| CortexError::Timeout { seconds: 5 })?
353 .map_err(|e| CortexError::ConnectionFailed {
354 url: url.clone(),
355 reason: format!("WebSocket connection failed: {e}"),
356 })?;
357
358 tracing::info!(url, status = %response.status(), "Connected to Cortex API");
359
360 let (writer, reader) = ws.split();
362
363 let pending_responses: Arc<Mutex<HashMap<u64, PendingResponse>>> =
364 Arc::new(Mutex::new(HashMap::new()));
365
366 let reader_running = Arc::new(AtomicBool::new(true));
367 let (reader_shutdown, reader_shutdown_rx) = tokio::sync::watch::channel(false);
368 let stream_senders: Arc<std::sync::Mutex<Option<StreamSenders>>> =
369 Arc::new(std::sync::Mutex::new(None));
370 let stream_dispatch_counters: Arc<std::sync::Mutex<StreamDispatchCounterMap>> =
371 Arc::new(std::sync::Mutex::new(HashMap::new()));
372
373 let reader_handle = Self::spawn_reader_loop(
376 reader,
377 Arc::clone(&pending_responses),
378 Arc::clone(&reader_running),
379 Arc::clone(&stream_senders),
380 Arc::clone(&stream_dispatch_counters),
381 reader_shutdown_rx,
382 );
383
384 Ok(Self {
385 writer: Arc::new(Mutex::new(writer)),
386 pending_responses,
387 next_id: AtomicU64::new(1),
388 reader_handle: Some(reader_handle),
389 reader_running,
390 reader_shutdown,
391 stream_senders,
392 stream_dispatch_counters,
393 rpc_timeout,
394 clock_origin: Instant::now(),
395 })
396 }
397
398 pub async fn connect_url(url: &str) -> CortexResult<Self> {
406 let config = CortexConfig {
407 client_id: String::new(),
408 client_secret: String::new(),
409 cortex_url: url.to_string(),
410 ..CortexConfig::new("", "")
411 };
412 Self::connect(&config).await
413 }
414
415 fn spawn_reader_loop(
417 mut reader: WsReader,
418 pending_responses: Arc<Mutex<HashMap<u64, PendingResponse>>>,
419 running: Arc<AtomicBool>,
420 stream_senders: Arc<std::sync::Mutex<Option<StreamSenders>>>,
421 stream_dispatch_counters: Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
422 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
423 ) -> JoinHandle<()> {
424 tokio::spawn(async move {
425 while running.load(Ordering::SeqCst) {
426 let msg = tokio::select! {
427 msg = reader.next() => msg,
428 changed = shutdown_rx.changed() => {
429 match changed {
430 Ok(()) if *shutdown_rx.borrow() => break,
431 Ok(()) => continue,
432 Err(_) => break,
433 }
434 },
435 };
436
437 match msg {
438 Some(Ok(Message::Text(text))) => {
439 Self::handle_text_message(
440 &text,
441 &pending_responses,
442 &stream_senders,
443 &stream_dispatch_counters,
444 )
445 .await;
446 }
447 Some(Ok(Message::Close(_))) => {
448 tracing::info!("Cortex WebSocket closed by server");
449 Self::drain_pending_connection_lost(
450 &pending_responses,
451 "Cortex WebSocket closed",
452 )
453 .await;
454 break;
455 }
456 Some(Err(e)) => {
457 tracing::warn!("WebSocket read error: {}", e);
458 Self::drain_pending_websocket(
459 &pending_responses,
460 format!("WebSocket error: {e}"),
461 )
462 .await;
463 break;
464 }
465 None => {
466 tracing::info!("Cortex WebSocket stream ended");
467 break;
468 }
469 _ => {
470 }
472 }
473 }
474
475 Self::drain_pending_connection_lost(&pending_responses, "Reader loop stopped").await;
476
477 tracing::debug!("Reader loop exiting");
478 running.store(false, Ordering::SeqCst);
479 })
480 }
481
482 async fn handle_text_message(
483 text: &str,
484 pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
485 stream_senders: &Arc<std::sync::Mutex<Option<StreamSenders>>>,
486 stream_dispatch_counters: &Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
487 ) {
488 tracing::debug!(raw = %text, "Reader loop received message");
489
490 let value: serde_json::Value = match serde_json::from_str(text) {
491 Ok(v) => v,
492 Err(e) => {
493 tracing::warn!("Failed to parse WebSocket message as JSON: {}", e);
494 return;
495 }
496 };
497
498 if value
499 .get("id")
500 .and_then(serde_json::Value::as_u64)
501 .is_some()
502 {
503 let _ = Self::dispatch_rpc_response(value, pending_responses).await;
504 return;
505 }
506
507 Self::dispatch_stream_event(value, stream_senders, stream_dispatch_counters);
508 }
509
510 async fn dispatch_rpc_response(
511 value: serde_json::Value,
512 pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
513 ) -> bool {
514 let Some(id) = value.get("id").and_then(serde_json::Value::as_u64) else {
515 return false;
516 };
517
518 let response: std::result::Result<CortexResponse, _> = serde_json::from_value(value);
519
520 let mut pending = pending_responses.lock().await;
521 if let Some(tx) = pending.remove(&id) {
522 match response {
523 Ok(resp) => {
524 let result = if let Some(error) = resp.error {
525 tracing::error!(
526 id,
527 code = error.code,
528 message = %error.message,
529 "Cortex API error in RPC response",
530 );
531 Err(CortexError::from_api_error(error.code, error.message))
532 } else {
533 resp.result.ok_or_else(|| CortexError::ProtocolError {
534 reason: "Response has no result or error".into(),
535 })
536 };
537 let _ = tx.send(result);
538 }
539 Err(e) => {
540 let _ = tx.send(Err(CortexError::ProtocolError {
541 reason: format!("Failed to parse RPC response: {e}"),
542 }));
543 }
544 }
545 } else {
546 tracing::debug!(id, "Received response for unknown request ID");
547 }
548
549 true
550 }
551
552 fn dispatch_stream_event(
553 value: serde_json::Value,
554 stream_senders: &Arc<std::sync::Mutex<Option<StreamSenders>>>,
555 stream_dispatch_counters: &Arc<std::sync::Mutex<StreamDispatchCounterMap>>,
556 ) {
557 let target_sender = if let Ok(guard) = stream_senders.lock() {
558 guard.as_ref().and_then(|senders| {
559 senders
560 .iter()
561 .find_map(|(key, tx)| value.get(*key).is_some().then(|| (*key, tx.clone())))
562 })
563 } else {
564 None
565 };
566
567 if let Some((stream_key, tx)) = target_sender {
568 let counter = stream_dispatch_counters
569 .lock()
570 .ok()
571 .and_then(|counters| counters.get(stream_key).cloned());
572
573 match tx.try_send(value) {
574 Ok(()) => {
575 if let Some(counter) = counter {
576 counter.delivered.fetch_add(1, Ordering::Relaxed);
577 }
578 }
579 Err(TrySendError::Full(_)) => {
580 if let Some(counter) = counter {
581 counter.dropped_full.fetch_add(1, Ordering::Relaxed);
582 }
583 }
584 Err(TrySendError::Closed(_)) => {
585 if let Some(counter) = counter {
586 counter.dropped_closed.fetch_add(1, Ordering::Relaxed);
587 }
588 }
589 }
590 }
591 }
592
593 async fn drain_pending_connection_lost(
594 pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
595 reason: &str,
596 ) {
597 let mut pending = pending_responses.lock().await;
598 for (_, tx) in pending.drain() {
599 let _ = tx.send(Err(CortexError::ConnectionLost {
600 reason: reason.to_string(),
601 }));
602 }
603 }
604
605 async fn drain_pending_websocket(
606 pending_responses: &Arc<Mutex<HashMap<u64, PendingResponse>>>,
607 reason: String,
608 ) {
609 let mut pending = pending_responses.lock().await;
610 for (_, tx) in pending.drain() {
611 let _ = tx.send(Err(CortexError::WebSocket(reason.clone())));
612 }
613 }
614
615 async fn call(
619 &self,
620 method: &'static str,
621 params: serde_json::Value,
622 ) -> CortexResult<serde_json::Value> {
623 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
624 let request = CortexRequest::new(id, method, params);
625
626 let json = serde_json::to_string(&request).map_err(|e| CortexError::ProtocolError {
627 reason: format!("serialize error: {e}"),
628 })?;
629
630 tracing::debug!(method, id, json = %json, "Sending Cortex request");
631
632 let (tx, rx) = oneshot::channel();
634 {
635 let mut pending = self.pending_responses.lock().await;
636 pending.insert(id, tx);
637 }
638
639 let send_result = {
641 let mut writer = self.writer.lock().await;
642 writer.send(Message::Text(json.into())).await
643 };
644 if let Err(e) = send_result {
645 let mut pending = self.pending_responses.lock().await;
646 pending.remove(&id);
647 return Err(CortexError::WebSocket(format!("Send error: {e}")));
648 }
649
650 let timeout_secs = self.rpc_timeout.as_secs();
652 let result = match tokio::time::timeout(self.rpc_timeout, rx).await {
653 Ok(Ok(response)) => response,
654 Ok(Err(_)) => {
655 return Err(CortexError::ConnectionLost {
656 reason: "Response channel dropped (reader loop died)".into(),
657 });
658 }
659 Err(_) => {
660 self.pending_responses.lock().await.remove(&id);
661 return Err(CortexError::Timeout {
662 seconds: timeout_secs,
663 });
664 }
665 }?;
666
667 tracing::debug!(method, id, "Cortex RPC succeeded");
668 Ok(result)
669 }
670
671 fn query_headsets_params(options: QueryHeadsetsOptions) -> serde_json::Value {
672 let mut params = serde_json::json!({});
673 if let Some(id) = options.id {
674 params["id"] = serde_json::json!(id);
675 }
676 if options.include_flex_mappings {
677 params["includeFlexMappings"] = serde_json::json!(true);
678 }
679 params
680 }
681
682 fn sync_with_headset_clock_params(&self, headset_id: &str) -> CortexResult<serde_json::Value> {
683 let monotonic_time = self.clock_origin.elapsed().as_secs_f64();
684 let system_time = Self::current_epoch_millis()?;
685
686 Ok(Self::sync_with_headset_clock_params_with_times(
687 headset_id,
688 monotonic_time,
689 system_time,
690 ))
691 }
692
693 fn current_epoch_millis() -> CortexResult<u64> {
694 let system_duration = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|e| {
695 CortexError::ProtocolError {
696 reason: format!("System clock is before UNIX epoch: {e}"),
697 }
698 })?;
699
700 u64::try_from(system_duration.as_millis()).map_err(|_| CortexError::ProtocolError {
701 reason: "System time in milliseconds exceeds u64 range".into(),
702 })
703 }
704
705 fn sync_with_headset_clock_params_with_times(
706 headset_id: &str,
707 monotonic_time: f64,
708 system_time: u64,
709 ) -> serde_json::Value {
710 serde_json::json!({
711 "headset": headset_id,
712 "monotonicTime": monotonic_time,
713 "systemTime": system_time,
714 })
715 }
716
717 fn config_mapping_params(
718 cortex_token: &str,
719 request: ConfigMappingRequest,
720 ) -> CortexResult<(ConfigMappingMode, serde_json::Value)> {
721 let mut params = serde_json::json!({
722 "cortexToken": cortex_token,
723 "status": request.mode().as_str(),
724 });
725
726 match request {
727 ConfigMappingRequest::Create { name, mappings } => {
728 if name.trim().is_empty() {
729 return Err(CortexError::ProtocolError {
730 reason: "configMapping create requires non-empty name".into(),
731 });
732 }
733 if !mappings.is_object() {
734 return Err(CortexError::ProtocolError {
735 reason: "configMapping create requires mappings as an object".into(),
736 });
737 }
738 params["name"] = serde_json::json!(name);
739 params["mappings"] = mappings;
740 Ok((ConfigMappingMode::Create, params))
741 }
742 ConfigMappingRequest::Get => Ok((ConfigMappingMode::Get, params)),
743 ConfigMappingRequest::Read { uuid } => {
744 if uuid.trim().is_empty() {
745 return Err(CortexError::ProtocolError {
746 reason: "configMapping read requires non-empty uuid".into(),
747 });
748 }
749 params["uuid"] = serde_json::json!(uuid);
750 Ok((ConfigMappingMode::Read, params))
751 }
752 ConfigMappingRequest::Update {
753 uuid,
754 name,
755 mappings,
756 } => {
757 if uuid.trim().is_empty() {
758 return Err(CortexError::ProtocolError {
759 reason: "configMapping update requires non-empty uuid".into(),
760 });
761 }
762 if name.is_none() && mappings.is_none() {
763 return Err(CortexError::ProtocolError {
764 reason: "configMapping update requires name and/or mappings".into(),
765 });
766 }
767 if mappings.as_ref().is_some_and(|m| !m.is_object()) {
768 return Err(CortexError::ProtocolError {
769 reason: "configMapping update requires mappings as an object".into(),
770 });
771 }
772
773 params["uuid"] = serde_json::json!(uuid);
774 if let Some(value) = name {
775 if value.trim().is_empty() {
776 return Err(CortexError::ProtocolError {
777 reason: "configMapping update name must be non-empty".into(),
778 });
779 }
780 params["name"] = serde_json::json!(value);
781 }
782 if let Some(value) = mappings {
783 params["mappings"] = value;
784 }
785 Ok((ConfigMappingMode::Update, params))
786 }
787 ConfigMappingRequest::Delete { uuid } => {
788 if uuid.trim().is_empty() {
789 return Err(CortexError::ProtocolError {
790 reason: "configMapping delete requires non-empty uuid".into(),
791 });
792 }
793 params["uuid"] = serde_json::json!(uuid);
794 Ok((ConfigMappingMode::Delete, params))
795 }
796 }
797 }
798
799 fn update_headset_custom_info_params(
800 cortex_token: &str,
801 headset_id: &str,
802 headband_position: Option<&str>,
803 custom_name: Option<&str>,
804 ) -> serde_json::Value {
805 let mut params = serde_json::json!({
806 "cortexToken": cortex_token,
807 "headsetId": headset_id,
808 "headset": headset_id,
810 });
811
812 if let Some(pos) = headband_position {
813 params["headbandPosition"] = serde_json::json!(pos);
814 }
815 if let Some(name) = custom_name {
816 params["customName"] = serde_json::json!(name);
817 }
818 params
819 }
820
821 fn mental_command_training_threshold_params(
822 cortex_token: &str,
823 session_id: Option<&str>,
824 profile: Option<&str>,
825 status: Option<&str>,
826 value: Option<f64>,
827 ) -> CortexResult<serde_json::Value> {
828 match (session_id, profile) {
829 (Some(_), Some(_)) => {
830 return Err(CortexError::ProtocolError {
831 reason: "Specify either session_id or profile, not both".into(),
832 });
833 }
834 (None, None) => {
835 return Err(CortexError::ProtocolError {
836 reason: "Specify either session_id or profile".into(),
837 });
838 }
839 _ => {}
840 }
841
842 let inferred_status = status.unwrap_or(if value.is_some() { "set" } else { "get" });
843
844 let mut params = serde_json::json!({
845 "cortexToken": cortex_token,
846 "status": inferred_status,
847 });
848
849 if let Some(session) = session_id {
850 params["session"] = serde_json::json!(session);
851 }
852 if let Some(profile_name) = profile {
853 params["profile"] = serde_json::json!(profile_name);
854 }
855 if let Some(threshold) = value {
856 params["value"] = serde_json::json!(threshold);
857 }
858 Ok(params)
859 }
860
861 fn subject_params(cortex_token: &str, request: &SubjectRequest) -> serde_json::Value {
862 let mut params = serde_json::json!({
863 "cortexToken": cortex_token,
864 "subjectName": request.subject_name.as_str(),
865 });
866
867 if let Some(dob) = &request.date_of_birth {
868 params["dateOfBirth"] = serde_json::json!(dob);
869 }
870 if let Some(sex) = &request.sex {
871 params["sex"] = serde_json::json!(sex);
872 }
873 if let Some(country_code) = &request.country_code {
874 params["countryCode"] = serde_json::json!(country_code);
875 }
876 if let Some(state) = &request.state {
877 params["state"] = serde_json::json!(state);
878 }
879 if let Some(city) = &request.city {
880 params["city"] = serde_json::json!(city);
881 }
882 if let Some(attributes) = &request.attributes {
883 params["attributes"] = serde_json::json!(attributes);
884 }
885
886 params
887 }
888
889 fn query_subjects_params(
890 cortex_token: &str,
891 request: &QuerySubjectsRequest,
892 ) -> serde_json::Value {
893 let mut params = serde_json::json!({
894 "cortexToken": cortex_token,
895 "query": request.query.clone(),
896 "orderBy": request.order_by.clone(),
897 });
898
899 if let Some(limit) = request.limit {
900 params["limit"] = serde_json::json!(limit);
901 }
902 if let Some(offset) = request.offset {
903 params["offset"] = serde_json::json!(offset);
904 }
905
906 params
907 }
908
909 fn facial_expression_signature_type_params(
910 cortex_token: &str,
911 request: &FacialExpressionSignatureTypeRequest,
912 ) -> serde_json::Value {
913 let mut params = serde_json::json!({
914 "cortexToken": cortex_token,
915 "status": request.status.as_str(),
916 });
917
918 if let Some(profile) = &request.profile {
919 params["profile"] = serde_json::json!(profile);
920 }
921 if let Some(session) = &request.session {
922 params["session"] = serde_json::json!(session);
923 }
924 if let Some(signature) = &request.signature {
925 params["signature"] = serde_json::json!(signature);
926 }
927
928 params
929 }
930
931 fn facial_expression_threshold_params(
932 cortex_token: &str,
933 request: &FacialExpressionThresholdRequest,
934 ) -> serde_json::Value {
935 let mut params = serde_json::json!({
936 "cortexToken": cortex_token,
937 "status": request.status.as_str(),
938 "action": request.action.as_str(),
939 });
940
941 if let Some(profile) = &request.profile {
942 params["profile"] = serde_json::json!(profile);
943 }
944 if let Some(session) = &request.session {
945 params["session"] = serde_json::json!(session);
946 }
947 if let Some(value) = request.value {
948 params["value"] = serde_json::json!(value);
949 }
950
951 params
952 }
953
954 fn stream_key(name: &str) -> &'static str {
958 match name {
959 Streams::EEG => "eeg",
960 Streams::DEV => "dev",
961 Streams::MOT => "mot",
962 Streams::EQ => "eq",
963 Streams::POW => "pow",
964 Streams::MET => "met",
965 Streams::COM => "com",
966 Streams::FAC => "fac",
967 Streams::SYS => "sys",
968 other => {
969 tracing::warn!(stream = other, "Unknown stream type");
970 "unknown"
971 }
972 }
973 }
974
975 pub fn create_stream_channels(&self, streams: &[&str]) -> StreamReceivers {
980 let mut senders = StreamSenders::new();
981 let mut receivers = StreamReceivers::new();
982 let mut counters = StreamDispatchCounterMap::new();
983
984 for &stream in streams {
985 let stream_key = Self::stream_key(stream);
986 let (tx, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
987 senders.insert(stream_key, tx);
988 receivers.insert(stream_key, rx);
989 counters.insert(stream_key, Arc::new(StreamDispatchCounters::default()));
990 }
991
992 if let Ok(mut guard) = self.stream_senders.lock() {
993 *guard = Some(senders);
994 }
995 if let Ok(mut guard) = self.stream_dispatch_counters.lock() {
996 *guard = counters;
997 }
998
999 receivers
1000 }
1001
1002 pub fn add_stream_channel(&self, stream: &str) -> Option<mpsc::Receiver<serde_json::Value>> {
1006 let stream_key = Self::stream_key(stream);
1007 let (tx, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
1008 if let Ok(mut guard) = self.stream_senders.lock() {
1009 let senders = guard.get_or_insert_with(StreamSenders::new);
1010 senders.insert(stream_key, tx);
1011 if let Ok(mut counters) = self.stream_dispatch_counters.lock() {
1012 counters
1013 .entry(stream_key)
1014 .or_insert_with(|| Arc::new(StreamDispatchCounters::default()));
1015 }
1016 Some(rx)
1017 } else {
1018 None
1019 }
1020 }
1021
1022 pub fn remove_stream_channel(&self, stream: &str) {
1024 if let Ok(mut guard) = self.stream_senders.lock() {
1025 if let Some(ref mut senders) = *guard {
1026 let stream_key = Self::stream_key(stream);
1027 senders.remove(stream_key);
1028 if let Ok(mut counters) = self.stream_dispatch_counters.lock() {
1029 counters.remove(stream_key);
1030 }
1031 }
1032 }
1033 }
1034
1035 pub fn clear_stream_channels(&self) {
1037 if let Ok(mut guard) = self.stream_senders.lock() {
1038 *guard = None;
1039 }
1040 if let Ok(mut counters) = self.stream_dispatch_counters.lock() {
1041 counters.clear();
1042 }
1043 }
1044
1045 pub fn stream_dispatch_stats(&self) -> HashMap<&'static str, StreamDispatchStats> {
1047 if let Ok(counters) = self.stream_dispatch_counters.lock() {
1048 counters
1049 .iter()
1050 .map(|(stream, counter)| (*stream, counter.snapshot()))
1051 .collect()
1052 } else {
1053 HashMap::new()
1054 }
1055 }
1056
1057 pub async fn pending_response_count(&self) -> usize {
1059 self.pending_responses.lock().await.len()
1060 }
1061
1062 pub async fn get_cortex_info(&self) -> CortexResult<serde_json::Value> {
1072 self.call(Methods::GET_CORTEX_INFO, serde_json::json!({}))
1073 .await
1074 }
1075
1076 pub async fn has_access_right(
1082 &self,
1083 client_id: &str,
1084 client_secret: &str,
1085 ) -> CortexResult<bool> {
1086 let result = self
1087 .call(
1088 Methods::HAS_ACCESS_RIGHT,
1089 serde_json::json!({
1090 "clientId": client_id,
1091 "clientSecret": client_secret,
1092 }),
1093 )
1094 .await?;
1095
1096 Ok(result
1097 .get("accessGranted")
1098 .and_then(serde_json::Value::as_bool)
1099 .unwrap_or(false))
1100 }
1101
1102 pub async fn get_user_login(&self) -> CortexResult<Vec<UserLoginInfo>> {
1108 let result = self
1109 .call(Methods::GET_USER_LOGIN, serde_json::json!({}))
1110 .await?;
1111
1112 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1113 reason: format!("Failed to parse user login info: {e}"),
1114 })
1115 }
1116
1117 pub async fn authenticate(&self, client_id: &str, client_secret: &str) -> CortexResult<String> {
1127 let cortex_info_ok = match self.get_cortex_info().await {
1129 Ok(info) => {
1130 tracing::info!("Cortex API info: {}", info);
1131 true
1132 }
1133 Err(e) => {
1134 tracing::warn!("getCortexInfo failed (continuing anyway): {}", e);
1135 false
1136 }
1137 };
1138
1139 match self
1141 .call(
1142 Methods::REQUEST_ACCESS,
1143 serde_json::json!({
1144 "clientId": client_id,
1145 "clientSecret": client_secret,
1146 }),
1147 )
1148 .await
1149 {
1150 Ok(_) => tracing::debug!("Cortex access requested"),
1151 Err(CortexError::MethodNotFound { .. }) => {
1152 tracing::info!(
1153 "requestAccess not available on this Cortex version \
1154 (Launcher handles app approval directly)"
1155 );
1156 }
1157 Err(e) => return Err(e),
1158 }
1159
1160 let auth_result = match self
1162 .call(
1163 Methods::AUTHORIZE,
1164 serde_json::json!({
1165 "clientId": client_id,
1166 "clientSecret": client_secret,
1167 }),
1168 )
1169 .await
1170 {
1171 Ok(result) => result,
1172 Err(CortexError::MethodNotFound { .. }) => {
1173 if !cortex_info_ok {
1174 tracing::error!(
1175 "Both getCortexInfo and authorize returned 'Method not found'. \
1176 The service may not be the Emotiv Cortex API, or may be incompatible."
1177 );
1178 }
1179 return Err(CortexError::AuthenticationFailed {
1180 reason: "Cortex API 'authorize' method not found (-32601). \
1181 Check that the EMOTIV Launcher is running and you are logged in."
1182 .into(),
1183 });
1184 }
1185 Err(e) => return Err(e),
1186 };
1187
1188 let cortex_token = auth_result
1189 .get("cortexToken")
1190 .and_then(|v| v.as_str())
1191 .ok_or_else(|| CortexError::ProtocolError {
1192 reason: "authorize response missing cortexToken".into(),
1193 })?
1194 .to_string();
1195
1196 tracing::info!("Cortex authentication successful");
1197
1198 Ok(cortex_token)
1199 }
1200
1201 pub async fn generate_new_token(
1209 &self,
1210 cortex_token: &str,
1211 client_id: &str,
1212 client_secret: &str,
1213 ) -> CortexResult<String> {
1214 let result = self
1215 .call(
1216 Methods::GENERATE_NEW_TOKEN,
1217 serde_json::json!({
1218 "cortexToken": cortex_token,
1219 "clientId": client_id,
1220 "clientSecret": client_secret,
1221 }),
1222 )
1223 .await?;
1224
1225 result
1226 .get("cortexToken")
1227 .and_then(|v| v.as_str())
1228 .map(std::string::ToString::to_string)
1229 .ok_or_else(|| CortexError::ProtocolError {
1230 reason: "generateNewToken response missing cortexToken".into(),
1231 })
1232 }
1233
1234 pub async fn get_user_info(&self, cortex_token: &str) -> CortexResult<serde_json::Value> {
1240 self.call(
1241 Methods::GET_USER_INFO,
1242 serde_json::json!({
1243 "cortexToken": cortex_token,
1244 }),
1245 )
1246 .await
1247 }
1248
1249 pub async fn get_license_info(&self, cortex_token: &str) -> CortexResult<serde_json::Value> {
1255 self.call(
1256 Methods::GET_LICENSE_INFO,
1257 serde_json::json!({
1258 "cortexToken": cortex_token,
1259 }),
1260 )
1261 .await
1262 }
1263
1264 pub async fn query_headsets(
1272 &self,
1273 options: QueryHeadsetsOptions,
1274 ) -> CortexResult<Vec<HeadsetInfo>> {
1275 let result = self
1276 .call(
1277 Methods::QUERY_HEADSETS,
1278 Self::query_headsets_params(options),
1279 )
1280 .await?;
1281
1282 let headsets: Vec<HeadsetInfo> =
1283 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1284 reason: format!("Failed to parse headset list: {e}"),
1285 })?;
1286
1287 tracing::info!(count = headsets.len(), "Queried headsets");
1288
1289 Ok(headsets)
1290 }
1291
1292 pub async fn connect_headset(&self, headset_id: &str) -> CortexResult<()> {
1298 self.call(
1299 Methods::CONTROL_DEVICE,
1300 serde_json::json!({
1301 "command": "connect",
1302 "headset": headset_id,
1303 }),
1304 )
1305 .await?;
1306
1307 tracing::info!(headset = headset_id, "Headset connection initiated");
1308 Ok(())
1309 }
1310
1311 pub async fn disconnect_headset(&self, headset_id: &str) -> CortexResult<()> {
1317 self.call(
1318 Methods::CONTROL_DEVICE,
1319 serde_json::json!({
1320 "command": "disconnect",
1321 "headset": headset_id,
1322 }),
1323 )
1324 .await?;
1325
1326 tracing::info!(headset = headset_id, "Headset disconnection initiated");
1327 Ok(())
1328 }
1329
1330 pub async fn refresh_headsets(&self) -> CortexResult<()> {
1336 self.call(
1337 Methods::CONTROL_DEVICE,
1338 serde_json::json!({
1339 "command": "refresh",
1340 }),
1341 )
1342 .await?;
1343
1344 tracing::debug!("Headset refresh/scan triggered");
1345 Ok(())
1346 }
1347
1348 pub async fn sync_with_headset_clock(
1361 &self,
1362 headset_id: &str,
1363 ) -> CortexResult<HeadsetClockSyncResult> {
1364 let result = self
1365 .call(
1366 Methods::SYNC_WITH_HEADSET_CLOCK,
1367 self.sync_with_headset_clock_params(headset_id)?,
1368 )
1369 .await?;
1370
1371 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1372 reason: format!("Failed to parse headset clock sync result: {e}"),
1373 })
1374 }
1375
1376 pub async fn config_mapping(
1382 &self,
1383 cortex_token: &str,
1384 request: ConfigMappingRequest,
1385 ) -> CortexResult<ConfigMappingResponse> {
1386 let (mode, params) = Self::config_mapping_params(cortex_token, request)?;
1387 let result = self.call(Methods::CONFIG_MAPPING, params).await?;
1388
1389 match mode {
1390 ConfigMappingMode::Create | ConfigMappingMode::Read | ConfigMappingMode::Update => {
1391 #[derive(serde::Deserialize)]
1392 struct ValueEnvelope {
1393 message: String,
1394 value: ConfigMappingValue,
1395 }
1396 let parsed: ValueEnvelope =
1397 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1398 reason: format!("Failed to parse configMapping value response: {e}"),
1399 })?;
1400 Ok(ConfigMappingResponse::Value {
1401 message: parsed.message,
1402 value: parsed.value,
1403 })
1404 }
1405 ConfigMappingMode::Get => {
1406 #[derive(serde::Deserialize)]
1407 struct ListEnvelope {
1408 message: String,
1409 value: ConfigMappingListValue,
1410 }
1411 let parsed: ListEnvelope =
1412 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1413 reason: format!("Failed to parse configMapping get response: {e}"),
1414 })?;
1415 Ok(ConfigMappingResponse::List {
1416 message: parsed.message,
1417 value: parsed.value,
1418 })
1419 }
1420 ConfigMappingMode::Delete => {
1421 #[derive(serde::Deserialize)]
1422 struct DeleteEnvelope {
1423 message: String,
1424 uuid: String,
1425 }
1426 let parsed: DeleteEnvelope =
1427 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1428 reason: format!("Failed to parse configMapping delete response: {e}"),
1429 })?;
1430 Ok(ConfigMappingResponse::Deleted {
1431 message: parsed.message,
1432 uuid: parsed.uuid,
1433 })
1434 }
1435 }
1436 }
1437
1438 pub async fn update_headset(
1457 &self,
1458 cortex_token: &str,
1459 headset_id: &str,
1460 setting: serde_json::Value,
1461 ) -> CortexResult<serde_json::Value> {
1462 self.call(
1463 Methods::UPDATE_HEADSET,
1464 serde_json::json!({
1465 "cortexToken": cortex_token,
1466 "headset": headset_id,
1467 "setting": setting,
1468 }),
1469 )
1470 .await
1471 }
1472
1473 pub async fn update_headset_custom_info(
1491 &self,
1492 cortex_token: &str,
1493 headset_id: &str,
1494 headband_position: Option<&str>,
1495 custom_name: Option<&str>,
1496 ) -> CortexResult<serde_json::Value> {
1497 self.call(
1498 Methods::UPDATE_HEADSET_CUSTOM_INFO,
1499 Self::update_headset_custom_info_params(
1500 cortex_token,
1501 headset_id,
1502 headband_position,
1503 custom_name,
1504 ),
1505 )
1506 .await
1507 }
1508
1509 pub async fn create_session(
1517 &self,
1518 cortex_token: &str,
1519 headset_id: &str,
1520 ) -> CortexResult<SessionInfo> {
1521 let result = self
1522 .call(
1523 Methods::CREATE_SESSION,
1524 serde_json::json!({
1525 "cortexToken": cortex_token,
1526 "headset": headset_id,
1527 "status": "active",
1528 }),
1529 )
1530 .await?;
1531
1532 let session: SessionInfo =
1533 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1534 reason: format!("Failed to parse session info: {e}"),
1535 })?;
1536
1537 tracing::info!(session_id = %session.id, "Session created");
1538 Ok(session)
1539 }
1540
1541 pub async fn query_sessions(&self, cortex_token: &str) -> CortexResult<Vec<SessionInfo>> {
1547 let result = self
1548 .call(
1549 Methods::QUERY_SESSIONS,
1550 serde_json::json!({
1551 "cortexToken": cortex_token,
1552 }),
1553 )
1554 .await?;
1555
1556 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
1557 reason: format!("Failed to parse sessions: {e}"),
1558 })
1559 }
1560
1561 pub async fn close_session(&self, cortex_token: &str, session_id: &str) -> CortexResult<()> {
1574 self.call(
1575 Methods::UPDATE_SESSION,
1576 serde_json::json!({
1577 "cortexToken": cortex_token,
1578 "session": session_id,
1579 "status": "close",
1580 }),
1581 )
1582 .await?;
1583
1584 tracing::info!(session_id, "Session closed");
1585 Ok(())
1586 }
1587
1588 pub async fn subscribe_streams(
1596 &self,
1597 cortex_token: &str,
1598 session_id: &str,
1599 streams: &[&str],
1600 ) -> CortexResult<serde_json::Value> {
1601 let resp = self
1602 .call(
1603 Methods::SUBSCRIBE,
1604 serde_json::json!({
1605 "cortexToken": cortex_token,
1606 "session": session_id,
1607 "streams": streams,
1608 }),
1609 )
1610 .await?;
1611
1612 tracing::info!(session_id, ?streams, "Subscribed to data streams");
1613 Ok(resp)
1614 }
1615
1616 pub async fn unsubscribe_streams(
1622 &self,
1623 cortex_token: &str,
1624 session_id: &str,
1625 streams: &[&str],
1626 ) -> CortexResult<()> {
1627 self.call(
1628 Methods::UNSUBSCRIBE,
1629 serde_json::json!({
1630 "cortexToken": cortex_token,
1631 "session": session_id,
1632 "streams": streams,
1633 }),
1634 )
1635 .await?;
1636
1637 tracing::info!(session_id, ?streams, "Unsubscribed from data streams");
1638 Ok(())
1639 }
1640
1641 pub async fn create_record(
1649 &self,
1650 cortex_token: &str,
1651 session_id: &str,
1652 title: &str,
1653 ) -> CortexResult<RecordInfo> {
1654 let result = self
1655 .call(
1656 Methods::CREATE_RECORD,
1657 serde_json::json!({
1658 "cortexToken": cortex_token,
1659 "session": session_id,
1660 "title": title,
1661 }),
1662 )
1663 .await?;
1664
1665 let record_value =
1666 result
1667 .get("record")
1668 .cloned()
1669 .ok_or_else(|| CortexError::ProtocolError {
1670 reason: "createRecord response missing 'record' field".into(),
1671 })?;
1672
1673 let record: RecordInfo =
1674 serde_json::from_value(record_value).map_err(|e| CortexError::ProtocolError {
1675 reason: format!("Failed to parse record info: {e}"),
1676 })?;
1677
1678 tracing::info!(record_id = %record.uuid, "Recording started");
1679 Ok(record)
1680 }
1681
1682 pub async fn stop_record(
1688 &self,
1689 cortex_token: &str,
1690 session_id: &str,
1691 ) -> CortexResult<RecordInfo> {
1692 let result = self
1693 .call(
1694 Methods::STOP_RECORD,
1695 serde_json::json!({
1696 "cortexToken": cortex_token,
1697 "session": session_id,
1698 }),
1699 )
1700 .await?;
1701
1702 let record_value =
1703 result
1704 .get("record")
1705 .cloned()
1706 .ok_or_else(|| CortexError::ProtocolError {
1707 reason: "stopRecord response missing 'record' field".into(),
1708 })?;
1709
1710 let record: RecordInfo =
1711 serde_json::from_value(record_value).map_err(|e| CortexError::ProtocolError {
1712 reason: format!("Failed to parse record info: {e}"),
1713 })?;
1714
1715 tracing::info!(record_id = %record.uuid, "Recording stopped");
1716 Ok(record)
1717 }
1718
1719 pub async fn query_records(
1725 &self,
1726 cortex_token: &str,
1727 limit: Option<u32>,
1728 offset: Option<u32>,
1729 ) -> CortexResult<Vec<RecordInfo>> {
1730 let mut params = serde_json::json!({
1731 "cortexToken": cortex_token,
1732 "query": {},
1733 "orderBy": [{ "startDatetime": "DESC" }],
1734 });
1735
1736 if let Some(limit) = limit {
1737 params["limit"] = serde_json::json!(limit);
1738 }
1739 if let Some(offset) = offset {
1740 params["offset"] = serde_json::json!(offset);
1741 }
1742
1743 let result = self.call(Methods::QUERY_RECORDS, params).await?;
1744
1745 let records = result
1746 .get("records")
1747 .cloned()
1748 .unwrap_or(serde_json::Value::Array(vec![]));
1749
1750 serde_json::from_value(records).map_err(|e| CortexError::ProtocolError {
1751 reason: format!("Failed to parse records: {e}"),
1752 })
1753 }
1754
1755 pub async fn export_record(
1761 &self,
1762 cortex_token: &str,
1763 record_ids: &[String],
1764 folder: &str,
1765 format: ExportFormat,
1766 ) -> CortexResult<()> {
1767 self.call(
1768 Methods::EXPORT_RECORD,
1769 serde_json::json!({
1770 "cortexToken": cortex_token,
1771 "recordIds": record_ids,
1772 "folder": folder,
1773 "format": format.as_str(),
1774 }),
1775 )
1776 .await?;
1777
1778 tracing::info!(
1779 ?record_ids,
1780 folder,
1781 format = format.as_str(),
1782 "Export initiated"
1783 );
1784 Ok(())
1785 }
1786
1787 pub async fn update_record_with(
1793 &self,
1794 cortex_token: &str,
1795 request: &UpdateRecordRequest,
1796 ) -> CortexResult<RecordInfo> {
1797 let mut params = serde_json::json!({
1798 "cortexToken": cortex_token,
1799 "record": request.record_id.as_str(),
1800 });
1801
1802 if let Some(t) = &request.title {
1803 params["title"] = serde_json::json!(t);
1804 }
1805 if let Some(d) = &request.description {
1806 params["description"] = serde_json::json!(d);
1807 }
1808 if let Some(t) = &request.tags {
1809 params["tags"] = serde_json::json!(t);
1810 }
1811
1812 let result = self.call(Methods::UPDATE_RECORD, params).await?;
1813
1814 let record_value =
1815 result
1816 .get("record")
1817 .cloned()
1818 .ok_or_else(|| CortexError::ProtocolError {
1819 reason: "updateRecord response missing 'record' field".into(),
1820 })?;
1821
1822 serde_json::from_value(record_value).map_err(|e| CortexError::ProtocolError {
1823 reason: format!("Failed to parse record info: {e}"),
1824 })
1825 }
1826
1827 #[deprecated(note = "Use `update_record_with` and `UpdateRecordRequest` instead.")]
1829 pub async fn update_record(
1834 &self,
1835 cortex_token: &str,
1836 record_id: &str,
1837 title: Option<&str>,
1838 description: Option<&str>,
1839 tags: Option<&[String]>,
1840 ) -> CortexResult<RecordInfo> {
1841 let request = UpdateRecordRequest {
1842 record_id: record_id.to_string(),
1843 title: title.map(ToString::to_string),
1844 description: description.map(ToString::to_string),
1845 tags: tags.map(<[std::string::String]>::to_vec),
1846 };
1847 self.update_record_with(cortex_token, &request).await
1848 }
1849
1850 pub async fn delete_record(
1856 &self,
1857 cortex_token: &str,
1858 record_ids: &[String],
1859 ) -> CortexResult<serde_json::Value> {
1860 self.call(
1861 Methods::DELETE_RECORD,
1862 serde_json::json!({
1863 "cortexToken": cortex_token,
1864 "records": record_ids,
1865 }),
1866 )
1867 .await
1868 }
1869
1870 pub async fn get_record_infos(
1876 &self,
1877 cortex_token: &str,
1878 record_ids: &[String],
1879 ) -> CortexResult<serde_json::Value> {
1880 self.call(
1881 Methods::GET_RECORD_INFOS,
1882 serde_json::json!({
1883 "cortexToken": cortex_token,
1884 "recordIds": record_ids,
1885 }),
1886 )
1887 .await
1888 }
1889
1890 pub async fn config_opt_out(
1898 &self,
1899 cortex_token: &str,
1900 status: &str,
1901 new_opt_out: Option<bool>,
1902 ) -> CortexResult<serde_json::Value> {
1903 let mut params = serde_json::json!({
1904 "cortexToken": cortex_token,
1905 "status": status,
1906 });
1907
1908 if let Some(opt) = new_opt_out {
1909 params["newOptOut"] = serde_json::json!(opt);
1910 }
1911
1912 self.call(Methods::CONFIG_OPT_OUT, params).await
1913 }
1914
1915 pub async fn download_record(
1921 &self,
1922 cortex_token: &str,
1923 record_ids: &[String],
1924 ) -> CortexResult<serde_json::Value> {
1925 self.call(
1926 Methods::DOWNLOAD_RECORD,
1927 serde_json::json!({
1928 "cortexToken": cortex_token,
1929 "recordIds": record_ids,
1930 }),
1931 )
1932 .await
1933 }
1934
1935 pub async fn inject_marker(
1943 &self,
1944 cortex_token: &str,
1945 session_id: &str,
1946 label: &str,
1947 value: i32,
1948 port: &str,
1949 time: Option<f64>,
1950 ) -> CortexResult<MarkerInfo> {
1951 let epoch_ms = match time {
1952 Some(value) => value,
1953 None => Self::current_epoch_millis()?
1954 .to_string()
1955 .parse::<f64>()
1956 .map_err(|e| CortexError::ProtocolError {
1957 reason: format!("Failed to convert epoch milliseconds to f64: {e}"),
1958 })?,
1959 };
1960
1961 let params = serde_json::json!({
1962 "cortexToken": cortex_token,
1963 "session": session_id,
1964 "label": label,
1965 "value": value,
1966 "port": port,
1967 "time": epoch_ms,
1968 });
1969
1970 let result = self.call(Methods::INJECT_MARKER, params).await?;
1971
1972 let marker_value =
1973 result
1974 .get("marker")
1975 .cloned()
1976 .ok_or_else(|| CortexError::ProtocolError {
1977 reason: "injectMarker response missing 'marker' field".into(),
1978 })?;
1979
1980 let marker: MarkerInfo =
1981 serde_json::from_value(marker_value).map_err(|e| CortexError::ProtocolError {
1982 reason: format!("Failed to parse marker info: {e}"),
1983 })?;
1984
1985 tracing::debug!(marker_id = %marker.uuid, label, "Marker injected");
1986 Ok(marker)
1987 }
1988
1989 pub async fn update_marker(
1995 &self,
1996 cortex_token: &str,
1997 session_id: &str,
1998 marker_id: &str,
1999 time: Option<f64>,
2000 ) -> CortexResult<()> {
2001 let mut params = serde_json::json!({
2002 "cortexToken": cortex_token,
2003 "session": session_id,
2004 "markerId": marker_id,
2005 });
2006
2007 if let Some(t) = time {
2008 params["time"] = serde_json::json!(t);
2009 }
2010
2011 self.call(Methods::UPDATE_MARKER, params).await?;
2012 tracing::debug!(marker_id, "Marker updated");
2013 Ok(())
2014 }
2015
2016 pub async fn create_subject_with(
2024 &self,
2025 cortex_token: &str,
2026 request: &SubjectRequest,
2027 ) -> CortexResult<SubjectInfo> {
2028 let result = self
2029 .call(
2030 Methods::CREATE_SUBJECT,
2031 Self::subject_params(cortex_token, request),
2032 )
2033 .await?;
2034
2035 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2036 reason: format!("Failed to parse subject info: {e}"),
2037 })
2038 }
2039
2040 #[deprecated(note = "Use `create_subject_with` and `SubjectRequest` instead.")]
2042 #[allow(clippy::too_many_arguments)]
2043 pub async fn create_subject(
2048 &self,
2049 cortex_token: &str,
2050 subject_name: &str,
2051 date_of_birth: Option<&str>,
2052 sex: Option<&str>,
2053 country_code: Option<&str>,
2054 state: Option<&str>,
2055 city: Option<&str>,
2056 attributes: Option<&[serde_json::Value]>,
2057 ) -> CortexResult<SubjectInfo> {
2058 let request = SubjectRequest {
2059 subject_name: subject_name.to_string(),
2060 date_of_birth: date_of_birth.map(ToString::to_string),
2061 sex: sex.map(ToString::to_string),
2062 country_code: country_code.map(ToString::to_string),
2063 state: state.map(ToString::to_string),
2064 city: city.map(ToString::to_string),
2065 attributes: attributes.map(<[serde_json::Value]>::to_vec),
2066 };
2067 self.create_subject_with(cortex_token, &request).await
2068 }
2069
2070 pub async fn update_subject_with(
2076 &self,
2077 cortex_token: &str,
2078 request: &SubjectRequest,
2079 ) -> CortexResult<SubjectInfo> {
2080 let result = self
2081 .call(
2082 Methods::UPDATE_SUBJECT,
2083 Self::subject_params(cortex_token, request),
2084 )
2085 .await?;
2086
2087 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2088 reason: format!("Failed to parse subject info: {e}"),
2089 })
2090 }
2091
2092 #[deprecated(note = "Use `update_subject_with` and `SubjectRequest` instead.")]
2094 #[allow(clippy::too_many_arguments)]
2095 pub async fn update_subject(
2100 &self,
2101 cortex_token: &str,
2102 subject_name: &str,
2103 date_of_birth: Option<&str>,
2104 sex: Option<&str>,
2105 country_code: Option<&str>,
2106 state: Option<&str>,
2107 city: Option<&str>,
2108 attributes: Option<&[serde_json::Value]>,
2109 ) -> CortexResult<SubjectInfo> {
2110 let request = SubjectRequest {
2111 subject_name: subject_name.to_string(),
2112 date_of_birth: date_of_birth.map(ToString::to_string),
2113 sex: sex.map(ToString::to_string),
2114 country_code: country_code.map(ToString::to_string),
2115 state: state.map(ToString::to_string),
2116 city: city.map(ToString::to_string),
2117 attributes: attributes.map(<[serde_json::Value]>::to_vec),
2118 };
2119 self.update_subject_with(cortex_token, &request).await
2120 }
2121
2122 pub async fn delete_subjects(
2128 &self,
2129 cortex_token: &str,
2130 subject_names: &[String],
2131 ) -> CortexResult<serde_json::Value> {
2132 self.call(
2133 Methods::DELETE_SUBJECTS,
2134 serde_json::json!({
2135 "cortexToken": cortex_token,
2136 "subjects": subject_names,
2137 }),
2138 )
2139 .await
2140 }
2141
2142 pub async fn query_subjects_with(
2150 &self,
2151 cortex_token: &str,
2152 request: &QuerySubjectsRequest,
2153 ) -> CortexResult<(Vec<SubjectInfo>, u32)> {
2154 let result = self
2155 .call(
2156 Methods::QUERY_SUBJECTS,
2157 Self::query_subjects_params(cortex_token, request),
2158 )
2159 .await?;
2160
2161 let count = result
2162 .get("count")
2163 .and_then(serde_json::Value::as_u64)
2164 .and_then(|value| u32::try_from(value).ok())
2165 .unwrap_or(0);
2166
2167 let subjects_value = result
2168 .get("subjects")
2169 .cloned()
2170 .unwrap_or(serde_json::Value::Array(vec![]));
2171
2172 let subjects: Vec<SubjectInfo> =
2173 serde_json::from_value(subjects_value).map_err(|e| CortexError::ProtocolError {
2174 reason: format!("Failed to parse subjects: {e}"),
2175 })?;
2176
2177 Ok((subjects, count))
2178 }
2179
2180 #[deprecated(note = "Use `query_subjects_with` and `QuerySubjectsRequest` instead.")]
2184 pub async fn query_subjects(
2189 &self,
2190 cortex_token: &str,
2191 query: serde_json::Value,
2192 order_by: serde_json::Value,
2193 limit: Option<u32>,
2194 offset: Option<u32>,
2195 ) -> CortexResult<(Vec<SubjectInfo>, u32)> {
2196 let request = QuerySubjectsRequest {
2197 query,
2198 order_by,
2199 limit,
2200 offset,
2201 };
2202 self.query_subjects_with(cortex_token, &request).await
2203 }
2204
2205 pub async fn get_demographic_attributes(
2211 &self,
2212 cortex_token: &str,
2213 ) -> CortexResult<Vec<DemographicAttribute>> {
2214 let result = self
2215 .call(
2216 Methods::GET_DEMOGRAPHIC_ATTRIBUTES,
2217 serde_json::json!({
2218 "cortexToken": cortex_token,
2219 }),
2220 )
2221 .await?;
2222
2223 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2224 reason: format!("Failed to parse demographic attributes: {e}"),
2225 })
2226 }
2227
2228 pub async fn query_profiles(&self, cortex_token: &str) -> CortexResult<Vec<ProfileInfo>> {
2236 let result = self
2237 .call(
2238 Methods::QUERY_PROFILE,
2239 serde_json::json!({
2240 "cortexToken": cortex_token,
2241 }),
2242 )
2243 .await?;
2244
2245 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2246 reason: format!("Failed to parse profiles: {e}"),
2247 })
2248 }
2249
2250 pub async fn get_current_profile(
2257 &self,
2258 cortex_token: &str,
2259 headset_id: &str,
2260 ) -> CortexResult<CurrentProfileInfo> {
2261 let result = self
2262 .call(
2263 Methods::GET_CURRENT_PROFILE,
2264 serde_json::json!({
2265 "cortexToken": cortex_token,
2266 "headset": headset_id,
2267 }),
2268 )
2269 .await?;
2270
2271 let profile: CurrentProfileInfo =
2272 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2273 reason: format!("Failed to parse current profile info: {e}"),
2274 })?;
2275
2276 Ok(profile)
2277 }
2278
2279 pub async fn setup_profile(
2285 &self,
2286 cortex_token: &str,
2287 headset_id: &str,
2288 profile_name: &str,
2289 action: ProfileAction,
2290 ) -> CortexResult<()> {
2291 self.call(
2292 Methods::SETUP_PROFILE,
2293 serde_json::json!({
2294 "cortexToken": cortex_token,
2295 "headset": headset_id,
2296 "profile": profile_name,
2297 "status": action.as_str(),
2298 }),
2299 )
2300 .await?;
2301
2302 tracing::info!(
2303 profile = profile_name,
2304 action = action.as_str(),
2305 "Profile action completed"
2306 );
2307 Ok(())
2308 }
2309
2310 pub async fn load_guest_profile(
2319 &self,
2320 cortex_token: &str,
2321 headset_id: &str,
2322 ) -> CortexResult<()> {
2323 self.call(
2324 Methods::LOAD_GUEST_PROFILE,
2325 serde_json::json!({
2326 "cortexToken": cortex_token,
2327 "headset": headset_id,
2328 }),
2329 )
2330 .await?;
2331
2332 tracing::info!(headset = headset_id, "Guest profile loaded");
2333 Ok(())
2334 }
2335
2336 pub async fn get_detection_info(
2344 &self,
2345 detection: DetectionType,
2346 ) -> CortexResult<DetectionInfo> {
2347 let result = self
2348 .call(
2349 Methods::GET_DETECTION_INFO,
2350 serde_json::json!({
2351 "detection": detection.as_str(),
2352 }),
2353 )
2354 .await?;
2355
2356 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2357 reason: format!("Failed to parse detection info: {e}"),
2358 })
2359 }
2360
2361 pub async fn training(
2367 &self,
2368 cortex_token: &str,
2369 session_id: &str,
2370 detection: DetectionType,
2371 status: TrainingStatus,
2372 action: &str,
2373 ) -> CortexResult<serde_json::Value> {
2374 self.call(
2375 Methods::TRAINING,
2376 serde_json::json!({
2377 "cortexToken": cortex_token,
2378 "session": session_id,
2379 "detection": detection.as_str(),
2380 "status": status.as_str(),
2381 "action": action,
2382 }),
2383 )
2384 .await
2385 }
2386
2387 pub async fn mental_command_active_action(
2393 &self,
2394 cortex_token: &str,
2395 session_id: &str,
2396 actions: Option<&[&str]>,
2397 ) -> CortexResult<serde_json::Value> {
2398 let mut params = serde_json::json!({
2399 "cortexToken": cortex_token,
2400 "session": session_id,
2401 "status": if actions.is_some() { "set" } else { "get" },
2402 });
2403
2404 if let Some(actions) = actions {
2405 params["actions"] = serde_json::json!(actions);
2406 }
2407
2408 self.call(Methods::MENTAL_COMMAND_ACTIVE_ACTION, params)
2409 .await
2410 }
2411
2412 pub async fn mental_command_action_sensitivity(
2418 &self,
2419 cortex_token: &str,
2420 session_id: &str,
2421 values: Option<&[i32]>,
2422 ) -> CortexResult<serde_json::Value> {
2423 let mut params = serde_json::json!({
2424 "cortexToken": cortex_token,
2425 "session": session_id,
2426 "status": if values.is_some() { "set" } else { "get" },
2427 });
2428
2429 if let Some(values) = values {
2430 params["values"] = serde_json::json!(values);
2431 }
2432
2433 self.call(Methods::MENTAL_COMMAND_ACTION_SENSITIVITY, params)
2434 .await
2435 }
2436
2437 pub async fn mental_command_brain_map(
2443 &self,
2444 cortex_token: &str,
2445 session_id: &str,
2446 ) -> CortexResult<serde_json::Value> {
2447 self.call(
2448 Methods::MENTAL_COMMAND_BRAIN_MAP,
2449 serde_json::json!({
2450 "cortexToken": cortex_token,
2451 "session": session_id,
2452 }),
2453 )
2454 .await
2455 }
2456
2457 pub async fn mental_command_training_threshold(
2471 &self,
2472 cortex_token: &str,
2473 session_id: &str,
2474 ) -> CortexResult<serde_json::Value> {
2475 let request = MentalCommandTrainingThresholdRequest {
2476 session_id: Some(session_id.to_string()),
2477 profile: None,
2478 status: None,
2479 value: None,
2480 };
2481 self.mental_command_training_threshold_with_request(cortex_token, &request)
2482 .await
2483 }
2484
2485 pub async fn mental_command_training_threshold_for_profile(
2494 &self,
2495 cortex_token: &str,
2496 profile: &str,
2497 status: Option<&str>,
2498 value: Option<f64>,
2499 ) -> CortexResult<serde_json::Value> {
2500 let request = MentalCommandTrainingThresholdRequest {
2501 session_id: None,
2502 profile: Some(profile.to_string()),
2503 status: status.map(ToString::to_string),
2504 value,
2505 };
2506 self.mental_command_training_threshold_with_request(cortex_token, &request)
2507 .await
2508 }
2509
2510 pub async fn mental_command_training_threshold_with_request(
2516 &self,
2517 cortex_token: &str,
2518 request: &MentalCommandTrainingThresholdRequest,
2519 ) -> CortexResult<serde_json::Value> {
2520 let params = Self::mental_command_training_threshold_params(
2521 cortex_token,
2522 request.session_id.as_deref(),
2523 request.profile.as_deref(),
2524 request.status.as_deref(),
2525 request.value,
2526 )?;
2527
2528 self.call(Methods::MENTAL_COMMAND_TRAINING_THRESHOLD, params)
2529 .await
2530 }
2531
2532 #[deprecated(
2543 note = "Use `mental_command_training_threshold_with_request` and `MentalCommandTrainingThresholdRequest` instead."
2544 )]
2545 pub async fn mental_command_training_threshold_with_params(
2546 &self,
2547 cortex_token: &str,
2548 session_id: Option<&str>,
2549 profile: Option<&str>,
2550 status: Option<&str>,
2551 value: Option<f64>,
2552 ) -> CortexResult<serde_json::Value> {
2553 let request = MentalCommandTrainingThresholdRequest {
2554 session_id: session_id.map(ToString::to_string),
2555 profile: profile.map(ToString::to_string),
2556 status: status.map(ToString::to_string),
2557 value,
2558 };
2559 self.mental_command_training_threshold_with_request(cortex_token, &request)
2560 .await
2561 }
2562
2563 pub async fn get_trained_signature_actions(
2571 &self,
2572 cortex_token: &str,
2573 detection: DetectionType,
2574 profile: Option<&str>,
2575 session: Option<&str>,
2576 ) -> CortexResult<TrainedSignatureActions> {
2577 let mut params = serde_json::json!({
2578 "cortexToken": cortex_token,
2579 "detection": detection.as_str(),
2580 });
2581
2582 if let Some(p) = profile {
2583 params["profile"] = serde_json::json!(p);
2584 }
2585 if let Some(s) = session {
2586 params["session"] = serde_json::json!(s);
2587 }
2588
2589 let result = self
2590 .call(Methods::GET_TRAINED_SIGNATURE_ACTIONS, params)
2591 .await?;
2592
2593 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2594 reason: format!("Failed to parse trained signature actions: {e}"),
2595 })
2596 }
2597
2598 pub async fn get_training_time(
2604 &self,
2605 cortex_token: &str,
2606 detection: DetectionType,
2607 session_id: &str,
2608 ) -> CortexResult<TrainingTime> {
2609 let result = self
2610 .call(
2611 Methods::GET_TRAINING_TIME,
2612 serde_json::json!({
2613 "cortexToken": cortex_token,
2614 "detection": detection.as_str(),
2615 "session": session_id,
2616 }),
2617 )
2618 .await?;
2619
2620 serde_json::from_value(result).map_err(|e| CortexError::ProtocolError {
2621 reason: format!("Failed to parse training time: {e}"),
2622 })
2623 }
2624
2625 pub async fn facial_expression_signature_type_with(
2634 &self,
2635 cortex_token: &str,
2636 request: &FacialExpressionSignatureTypeRequest,
2637 ) -> CortexResult<serde_json::Value> {
2638 self.call(
2639 Methods::FACIAL_EXPRESSION_SIGNATURE_TYPE,
2640 Self::facial_expression_signature_type_params(cortex_token, request),
2641 )
2642 .await
2643 }
2644
2645 #[deprecated(
2654 note = "Use `facial_expression_signature_type_with` and `FacialExpressionSignatureTypeRequest` instead."
2655 )]
2656 pub async fn facial_expression_signature_type(
2657 &self,
2658 cortex_token: &str,
2659 status: &str,
2660 profile: Option<&str>,
2661 session: Option<&str>,
2662 signature: Option<&str>,
2663 ) -> CortexResult<serde_json::Value> {
2664 let request = FacialExpressionSignatureTypeRequest {
2665 status: status.to_string(),
2666 profile: profile.map(ToString::to_string),
2667 session: session.map(ToString::to_string),
2668 signature: signature.map(ToString::to_string),
2669 };
2670 self.facial_expression_signature_type_with(cortex_token, &request)
2671 .await
2672 }
2673
2674 pub async fn facial_expression_threshold_with(
2684 &self,
2685 cortex_token: &str,
2686 request: &FacialExpressionThresholdRequest,
2687 ) -> CortexResult<serde_json::Value> {
2688 self.call(
2689 Methods::FACIAL_EXPRESSION_THRESHOLD,
2690 Self::facial_expression_threshold_params(cortex_token, request),
2691 )
2692 .await
2693 }
2694
2695 #[deprecated(
2705 note = "Use `facial_expression_threshold_with` and `FacialExpressionThresholdRequest` instead."
2706 )]
2707 pub async fn facial_expression_threshold(
2708 &self,
2709 cortex_token: &str,
2710 status: &str,
2711 action: &str,
2712 profile: Option<&str>,
2713 session: Option<&str>,
2714 value: Option<u32>,
2715 ) -> CortexResult<serde_json::Value> {
2716 let request = FacialExpressionThresholdRequest {
2717 status: status.to_string(),
2718 action: action.to_string(),
2719 profile: profile.map(ToString::to_string),
2720 session: session.map(ToString::to_string),
2721 value,
2722 };
2723 self.facial_expression_threshold_with(cortex_token, &request)
2724 .await
2725 }
2726
2727 pub fn is_connected(&self) -> bool {
2731 self.reader_running.load(Ordering::SeqCst)
2732 }
2733
2734 pub async fn stop_reader(&mut self) {
2736 self.reader_running.store(false, Ordering::SeqCst);
2737 let _ = self.reader_shutdown.send(true);
2738 if let Some(handle) = self.reader_handle.take() {
2739 let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
2740 }
2741 }
2742
2743 pub async fn disconnect(&mut self) -> CortexResult<()> {
2749 self.stop_reader().await;
2750
2751 let mut writer = self.writer.lock().await;
2752 let _ = writer.close().await;
2753
2754 Ok(())
2755 }
2756}
2757
2758#[cfg(test)]
2759mod tests {
2760 use super::*;
2761
2762 #[test]
2763 fn test_query_headsets_params_default_is_empty() {
2764 let params = CortexClient::query_headsets_params(QueryHeadsetsOptions::default());
2765 assert_eq!(params, serde_json::json!({}));
2766 }
2767
2768 #[test]
2769 fn test_query_headsets_params_with_id() {
2770 let params = CortexClient::query_headsets_params(QueryHeadsetsOptions {
2771 id: Some("HS-123".into()),
2772 include_flex_mappings: false,
2773 });
2774 assert_eq!(params["id"], "HS-123");
2775 assert!(params.get("includeFlexMappings").is_none());
2776 }
2777
2778 #[test]
2779 fn test_query_headsets_params_with_include_flex_mappings() {
2780 let params = CortexClient::query_headsets_params(QueryHeadsetsOptions {
2781 id: None,
2782 include_flex_mappings: true,
2783 });
2784 assert_eq!(params["includeFlexMappings"], true);
2785 assert!(params.get("id").is_none());
2786 }
2787
2788 #[test]
2789 fn test_query_headsets_params_with_both_options() {
2790 let params = CortexClient::query_headsets_params(QueryHeadsetsOptions {
2791 id: Some("HS-123".into()),
2792 include_flex_mappings: true,
2793 });
2794 assert_eq!(params["id"], "HS-123");
2795 assert_eq!(params["includeFlexMappings"], true);
2796 }
2797
2798 #[test]
2799 fn test_sync_with_headset_clock_params_use_docs_shape() {
2800 let params = CortexClient::sync_with_headset_clock_params_with_times("HS-123", 12.34, 5678);
2801 assert_eq!(params["headset"], "HS-123");
2802 assert_eq!(params["monotonicTime"], 12.34);
2803 assert_eq!(params["systemTime"], 5678);
2804 assert!(params.get("cortexToken").is_none());
2805 assert!(params.get("headsetId").is_none());
2806 }
2807
2808 #[test]
2809 fn test_update_headset_custom_info_uses_headset_id() {
2810 let params = CortexClient::update_headset_custom_info_params(
2811 "token",
2812 "HS-123",
2813 Some("front"),
2814 Some("My Headset"),
2815 );
2816 assert_eq!(params["headsetId"], "HS-123");
2817 assert_eq!(params["headset"], "HS-123");
2818 assert_eq!(params["headbandPosition"], "front");
2819 assert_eq!(params["customName"], "My Headset");
2820 }
2821
2822 #[test]
2823 fn test_update_headset_custom_info_omits_optional_fields_when_none() {
2824 let params = CortexClient::update_headset_custom_info_params("token", "HS-123", None, None);
2825 assert_eq!(params["headsetId"], "HS-123");
2826 assert_eq!(params["headset"], "HS-123");
2827 assert!(params.get("headbandPosition").is_none());
2828 assert!(params.get("customName").is_none());
2829 }
2830
2831 #[test]
2832 fn test_config_mapping_create_params_validation() {
2833 let empty_name = CortexClient::config_mapping_params(
2834 "token",
2835 ConfigMappingRequest::Create {
2836 name: " ".into(),
2837 mappings: serde_json::json!({"CMS":"TP9"}),
2838 },
2839 );
2840 assert!(matches!(empty_name, Err(CortexError::ProtocolError { .. })));
2841
2842 let invalid = CortexClient::config_mapping_params(
2843 "token",
2844 ConfigMappingRequest::Create {
2845 name: "cfg".into(),
2846 mappings: serde_json::json!(["not-an-object"]),
2847 },
2848 );
2849 assert!(matches!(invalid, Err(CortexError::ProtocolError { .. })));
2850
2851 let valid = CortexClient::config_mapping_params(
2852 "token",
2853 ConfigMappingRequest::Create {
2854 name: "cfg".into(),
2855 mappings: serde_json::json!({"CMS":"TP9"}),
2856 },
2857 )
2858 .unwrap();
2859
2860 assert!(matches!(valid.0, ConfigMappingMode::Create));
2861 assert_eq!(valid.1["status"], "create");
2862 assert_eq!(valid.1["name"], "cfg");
2863 }
2864
2865 #[test]
2866 fn test_config_mapping_read_and_delete_require_uuid() {
2867 let read = CortexClient::config_mapping_params(
2868 "token",
2869 ConfigMappingRequest::Read {
2870 uuid: String::new(),
2871 },
2872 );
2873 assert!(matches!(read, Err(CortexError::ProtocolError { .. })));
2874
2875 let delete = CortexClient::config_mapping_params(
2876 "token",
2877 ConfigMappingRequest::Delete {
2878 uuid: String::new(),
2879 },
2880 );
2881 assert!(matches!(delete, Err(CortexError::ProtocolError { .. })));
2882 }
2883
2884 #[test]
2885 fn test_config_mapping_update_requires_name_or_mappings() {
2886 let missing = CortexClient::config_mapping_params(
2887 "token",
2888 ConfigMappingRequest::Update {
2889 uuid: "uuid-1".into(),
2890 name: None,
2891 mappings: None,
2892 },
2893 );
2894 assert!(matches!(missing, Err(CortexError::ProtocolError { .. })));
2895
2896 let valid = CortexClient::config_mapping_params(
2897 "token",
2898 ConfigMappingRequest::Update {
2899 uuid: "uuid-1".into(),
2900 name: Some("new".into()),
2901 mappings: None,
2902 },
2903 )
2904 .unwrap();
2905 assert!(matches!(valid.0, ConfigMappingMode::Update));
2906 assert_eq!(valid.1["uuid"], "uuid-1");
2907 assert_eq!(valid.1["name"], "new");
2908 }
2909
2910 #[test]
2911 fn test_config_mapping_update_validation_cases() {
2912 let empty_uuid = CortexClient::config_mapping_params(
2913 "token",
2914 ConfigMappingRequest::Update {
2915 uuid: " ".into(),
2916 name: Some("new-name".into()),
2917 mappings: None,
2918 },
2919 );
2920 assert!(matches!(empty_uuid, Err(CortexError::ProtocolError { .. })));
2921
2922 let empty_name = CortexClient::config_mapping_params(
2923 "token",
2924 ConfigMappingRequest::Update {
2925 uuid: "uuid-1".into(),
2926 name: Some(String::new()),
2927 mappings: None,
2928 },
2929 );
2930 assert!(matches!(empty_name, Err(CortexError::ProtocolError { .. })));
2931
2932 let invalid_mappings = CortexClient::config_mapping_params(
2933 "token",
2934 ConfigMappingRequest::Update {
2935 uuid: "uuid-1".into(),
2936 name: None,
2937 mappings: Some(serde_json::json!(["bad-shape"])),
2938 },
2939 );
2940 assert!(matches!(
2941 invalid_mappings,
2942 Err(CortexError::ProtocolError { .. })
2943 ));
2944 }
2945
2946 #[test]
2947 fn test_mental_command_training_threshold_params_get_and_set_modes() {
2948 let get_params = CortexClient::mental_command_training_threshold_params(
2949 "token",
2950 Some("session-1"),
2951 None,
2952 None,
2953 None,
2954 )
2955 .unwrap();
2956 assert_eq!(get_params["status"], "get");
2957 assert_eq!(get_params["session"], "session-1");
2958 assert!(get_params.get("value").is_none());
2959
2960 let set_params = CortexClient::mental_command_training_threshold_params(
2961 "token",
2962 None,
2963 Some("profile-a"),
2964 Some("set"),
2965 Some(0.42),
2966 )
2967 .unwrap();
2968 assert_eq!(set_params["status"], "set");
2969 assert_eq!(set_params["profile"], "profile-a");
2970 assert_eq!(set_params["value"], 0.42);
2971 }
2972
2973 #[test]
2974 fn test_mental_command_training_threshold_params_validation() {
2975 let both = CortexClient::mental_command_training_threshold_params(
2976 "token",
2977 Some("session-1"),
2978 Some("profile-a"),
2979 None,
2980 None,
2981 );
2982 assert!(matches!(both, Err(CortexError::ProtocolError { .. })));
2983
2984 let neither =
2985 CortexClient::mental_command_training_threshold_params("token", None, None, None, None);
2986 assert!(matches!(neither, Err(CortexError::ProtocolError { .. })));
2987 }
2988}