1use std::future::Future;
6use std::sync::Arc;
7
8use tokio::sync::RwLock;
9
10use crate::error::{SdkError, SdkResult};
11use crate::events::event_loop::EventLoop;
12use crate::events::ServerEvent;
13use crate::http::HttpClient;
14use crate::mqtt::client::{MqttClient, MqttConnectionState, MqttEvent};
15
16use crate::services::{
17 AuthService, ConversationService, FriendService, MessageService, UserService,
18};
19
20pub const ENV_API_KEY: &str = "AGENTLINK_API_KEY";
26
27pub const ENV_API_URL: &str = "AGENTLINK_API_URL";
29
30pub const DEFAULT_API_URL: &str = "https://agentlink-api.feedecho.xyz";
32
33pub const API_PATH_PREFIX: &str = "/api/v1";
35
36pub const DEFAULT_MQTT_PORT: u16 = 8883;
38
39fn derive_mqtt_url_from_api_url(api_url: &str) -> String {
49 let host = api_url
51 .strip_prefix("https://")
52 .or_else(|| api_url.strip_prefix("http://"))
53 .unwrap_or(api_url);
54
55 let host = host.split(':').next().unwrap_or(host);
57
58 let host = host.split('/').next().unwrap_or(host);
60
61 let parts: Vec<&str> = host.split('.').collect();
64 let base_domain = if parts.len() >= 2 {
65 format!("{}.{}", parts[parts.len() - 2], parts[parts.len() - 1])
67 } else {
68 host.to_string()
69 };
70
71 format!("mqtts://mqtt.{}:{}", base_domain, DEFAULT_MQTT_PORT)
73}
74
75fn get_full_api_url(base_url: &str) -> String {
77 if base_url.ends_with("/api/v1") {
79 return base_url.to_string();
80 }
81
82 format!("{}{}", base_url.trim_end_matches('/'), API_PATH_PREFIX)
84}
85
86#[derive(Debug, Clone)]
92pub struct ClientConfig {
93 pub api_url: String,
95 pub mqtt_broker_url: String,
97 pub token: Option<String>,
99 pub user_id: Option<String>,
101}
102
103impl ClientConfig {
104 pub fn new(api_url: &str, mqtt_broker_url: &str) -> Self {
106 Self {
107 api_url: get_full_api_url(api_url),
108 mqtt_broker_url: mqtt_broker_url.to_string(),
109 token: None,
110 user_id: None,
111 }
112 }
113
114 pub fn from_env() -> Result<Self, SdkError> {
134 let api_key = std::env::var(ENV_API_KEY)
135 .map_err(|_| SdkError::Config(format!(
136 "Missing environment variable: {}. Please set your API key.",
137 ENV_API_KEY
138 )))?;
139
140 let api_base_url = std::env::var(ENV_API_URL)
141 .unwrap_or_else(|_| DEFAULT_API_URL.to_string());
142
143 let mqtt_broker_url = derive_mqtt_url_from_api_url(&api_base_url);
145
146 let api_url = get_full_api_url(&api_base_url);
148
149 Ok(Self {
150 api_url,
151 mqtt_broker_url,
152 token: Some(api_key),
153 user_id: None,
154 })
155 }
156
157 pub fn try_from_env() -> Option<Self> {
169 Self::from_env().ok()
170 }
171
172 pub fn with_token(mut self, token: String) -> Self {
174 self.token = Some(token);
175 self
176 }
177
178 pub fn with_user_id(mut self, user_id: String) -> Self {
180 self.user_id = Some(user_id);
181 self
182 }
183}
184
185impl Default for ClientConfig {
186 fn default() -> Self {
187 Self {
188 api_url: get_full_api_url(DEFAULT_API_URL),
189 mqtt_broker_url: derive_mqtt_url_from_api_url(DEFAULT_API_URL),
190 token: None,
191 user_id: None,
192 }
193 }
194}
195
196pub struct AgentLinkClient {
241 config: ClientConfig,
242 http: Arc<HttpClient>,
243 mqtt: Arc<RwLock<MqttClient>>,
244 event_loop: Arc<RwLock<EventLoop>>,
245 event_loop_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
246}
247
248impl AgentLinkClient {
249 pub fn new(config: ClientConfig) -> Self {
251 let http = Arc::new(HttpClient::new(&config.api_url));
252 let mqtt = Arc::new(RwLock::new(MqttClient::new(&config.mqtt_broker_url)));
253 let (event_loop, _) = EventLoop::new();
254
255 if let Some(ref token) = config.token {
257 http.token_manager().set_token(token);
258 }
259
260 Self {
261 config,
262 http,
263 mqtt,
264 event_loop: Arc::new(RwLock::new(event_loop)),
265 event_loop_handle: Arc::new(RwLock::new(None)),
266 }
267 }
268
269 pub fn from_env() -> Result<Self, SdkError> {
285 let config = ClientConfig::from_env()?;
286 Ok(Self::new(config))
287 }
288
289 pub fn try_from_env() -> Option<Self> {
291 Self::from_env().ok()
292 }
293
294 pub fn from_api_key(api_key: &str) -> Self {
301 let config = ClientConfig::default().with_token(api_key.to_string());
302 Self::new(config)
303 }
304
305 pub fn from_api_key_with_urls(api_key: &str, api_url: &str, mqtt_url: &str) -> Self {
316 let config = ClientConfig::new(api_url, mqtt_url).with_token(api_key.to_string());
317 Self::new(config)
318 }
319
320 pub fn config(&self) -> &ClientConfig {
322 &self.config
323 }
324
325 pub fn update_config(&mut self, config: ClientConfig) {
327 self.config = config;
328 }
329
330 pub fn auth(&self) -> AuthService {
336 AuthService::new(self.http.clone())
337 }
338
339 pub fn users(&self) -> UserService {
343 UserService::new(self.http.clone())
344 }
345
346 pub fn messages(&self) -> MessageService {
350 MessageService::with_mqtt(self.http.clone(), self.mqtt.clone())
351 }
352
353 pub fn conversations(&self) -> ConversationService {
357 ConversationService::with_mqtt(self.http.clone(), self.mqtt.clone())
358 }
359
360 pub fn friends(&self) -> FriendService {
364 FriendService::with_mqtt(self.http.clone(), self.mqtt.clone())
365 }
366
367 pub async fn connect_mqtt(&self, token: &str, user_id: &str) -> SdkResult<()> {
379 let mqtt = self.mqtt.read().await;
380 mqtt.connect(token, user_id).await
381 }
382
383 pub async fn disconnect_mqtt(&self) -> SdkResult<()> {
385 let mqtt = self.mqtt.read().await;
386 mqtt.disconnect().await
387 }
388
389 pub async fn mqtt_connection_state(&self) -> MqttConnectionState {
391 let mqtt = self.mqtt.read().await;
392 mqtt.connection_state().await
393 }
394
395 pub async fn is_mqtt_connected(&self) -> bool {
397 let mqtt = self.mqtt.read().await;
398 mqtt.is_connected().await
399 }
400
401 pub async fn subscribe(&self, topic: &str) -> SdkResult<()> {
406 let mqtt = self.mqtt.read().await;
407 mqtt.subscribe(topic).await
408 }
409
410 pub async fn unsubscribe(&self, topic: &str) -> SdkResult<()> {
415 let mqtt = self.mqtt.read().await;
416 mqtt.unsubscribe(topic).await
417 }
418
419 pub async fn publish(&self, topic: &str, payload: &[u8]) -> SdkResult<()> {
425 let mqtt = self.mqtt.read().await;
426 mqtt.publish(topic, payload).await
427 }
428
429 pub async fn on<T, F, Fut>(&self, event_type: &str, callback: F)
444 where
445 T: serde::de::DeserializeOwned + Send + Sync + 'static,
446 F: Fn(ServerEvent<T>) -> Fut + Send + Sync + 'static,
447 Fut: Future<Output = ()> + Send + 'static,
448 {
449 let event_loop = self.event_loop.read().await;
450 event_loop.on_event(event_type, callback).await;
451 }
452
453 pub async fn off(&self, event_type: &str) {
460 let event_loop = self.event_loop.read().await;
461 event_loop.off_event(event_type).await;
462 }
463
464 pub async fn clear_callbacks(&self) {
471 let event_loop = self.event_loop.read().await;
472 event_loop.clear_callbacks().await;
473 }
474
475 pub fn event_loop(&self) -> Arc<RwLock<EventLoop>> {
482 self.event_loop.clone()
483 }
484
485 pub async fn start_event_loop(&self) -> SdkResult<()> {
504 {
506 let event_loop = self.event_loop.read().await;
507 if event_loop.is_running().await {
508 tracing::warn!("[AgentLinkClient] Event loop already running");
509 return Ok(());
510 }
511 }
512
513 let mqtt_rx = self.mqtt.read().await.take_event_receiver().await
515 .ok_or_else(|| crate::error::SdkError::Mqtt("Event receiver already taken".to_string()))?;
516
517 let (event_loop, event_rx) = EventLoop::new();
519
520 {
522 let mut el = self.event_loop.write().await;
523 *el = event_loop;
524 }
525
526 let event_loop = self.event_loop.clone();
528 let handle = tokio::spawn(async move {
529 tracing::debug!("[AgentLinkClient] Event loop task started");
530
531 let event_loop = event_loop.read().await.clone();
533 event_loop.start(event_rx).await;
534
535 tracing::debug!("[AgentLinkClient] Event loop task ended");
536 });
537
538 let event_sender = self.event_loop.read().await.event_sender();
540 tokio::spawn(async move {
541 let mut mqtt_rx = mqtt_rx;
542 loop {
543 match mqtt_rx.recv().await {
544 Some(MqttEvent::MessageReceived { topic: _, payload }) => {
545 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&payload) {
547 if let Some(event_type) = value.get("event_type").and_then(|v| v.as_str()) {
548 let _ = event_sender.send((event_type.to_string(), value));
549 }
550 }
551 }
552 Some(MqttEvent::Connected) => {
553 tracing::debug!("[AgentLinkClient] MQTT connected");
554 }
555 Some(MqttEvent::Disconnected) => {
556 tracing::debug!("[AgentLinkClient] MQTT disconnected");
557 break;
558 }
559 None => {
560 tracing::debug!("[AgentLinkClient] MQTT event channel closed");
561 break;
562 }
563 _ => {}
564 }
565 }
566 });
567
568 *self.event_loop_handle.write().await = Some(handle);
569
570 tracing::debug!("[AgentLinkClient] Event loop started");
571 Ok(())
572 }
573
574 pub async fn stop_event_loop(&self) {
576 if let Some(handle) = self.event_loop_handle.write().await.take() {
577 handle.abort();
578 tracing::debug!("[AgentLinkClient] Event loop stopped");
579 }
580 }
581
582 pub async fn mqtt_subscriptions(&self) -> Vec<String> {
584 let mqtt = self.mqtt.read().await;
585 mqtt.subscriptions().await
586 }
587
588 pub async fn connect_and_start(&self) -> SdkResult<()> {
612 let api_key = self.get_token()
613 .ok_or_else(|| SdkError::Config("No API key set. Use from_env() or from_api_key()".to_string()))?;
614
615 self.connect_mqtt_with_api_key(&api_key).await?;
617
618 self.start_event_loop().await?;
620
621 tracing::info!("[AgentLinkClient] Connected and started with API key");
622 Ok(())
623 }
624
625 pub async fn connect_mqtt_with_api_key(&self, api_key: &str) -> SdkResult<()> {
632 let mqtt = self.mqtt.read().await;
633 mqtt.connect_with_api_key(api_key).await
634 }
635
636 pub async fn login(&self, email: &str, code: &str) -> SdkResult<crate::protocols::auth::LoginResponse> {
652 let response = self.auth().login_with_email_code(email, code).await?;
653
654 self.connect_mqtt(&response.token, &response.user.id).await?;
656
657 if let Err(e) = self.start_event_loop().await {
659 tracing::warn!("[AgentLinkClient] Failed to start event loop: {}", e);
660 }
661
662 Ok(response)
663 }
664
665 pub fn set_token(&self, token: &str) {
670 self.http.token_manager().set_token(token);
671 }
672
673 pub fn with_token(&self, token: &str) -> &Self {
684 self.http.token_manager().set_token(token);
685 self
686 }
687
688 pub fn clear_token(&self) {
690 self.http.token_manager().clear_token();
691 }
692
693 pub fn get_token(&self) -> Option<String> {
695 self.http.token_manager().get_token()
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 #[test]
704 fn test_derive_mqtt_url() {
705 assert_eq!(
707 derive_mqtt_url_from_api_url("https://agentlink-api.feedecho.xyz"),
708 "mqtts://mqtt.feedecho.xyz:8883"
709 );
710
711 assert_eq!(
713 derive_mqtt_url_from_api_url("http://localhost:9600"),
714 "mqtts://mqtt.localhost:8883"
715 );
716
717 assert_eq!(
719 derive_mqtt_url_from_api_url("https://api.example.com/api/v1"),
720 "mqtts://mqtt.example.com:8883"
721 );
722 }
723
724 #[test]
725 fn test_get_full_api_url() {
726 assert_eq!(
727 get_full_api_url("https://agentlink-api.feedecho.xyz"),
728 "https://agentlink-api.feedecho.xyz/api/v1"
729 );
730
731 assert_eq!(
733 get_full_api_url("https://agentlink-api.feedecho.xyz/api/v1"),
734 "https://agentlink-api.feedecho.xyz/api/v1"
735 );
736
737 assert_eq!(
739 get_full_api_url("https://agentlink-api.feedecho.xyz/"),
740 "https://agentlink-api.feedecho.xyz/api/v1"
741 );
742 }
743
744 #[test]
745 fn test_client_config() {
746 let config = ClientConfig::new(
747 "http://localhost:8080",
748 "mqtts://localhost:8883",
749 );
750 assert_eq!(config.api_url, "http://localhost:8080/api/v1");
751 assert_eq!(config.mqtt_broker_url, "mqtts://localhost:8883");
752 }
753
754 #[test]
755 fn test_client_config_default() {
756 let config = ClientConfig::default();
757 assert_eq!(config.api_url, "https://agentlink-api.feedecho.xyz/api/v1");
758 assert_eq!(config.mqtt_broker_url, "mqtts://mqtt.feedecho.xyz:8883");
759 assert!(config.token.is_none());
760 }
761
762 #[test]
763 fn test_client_config_with_token() {
764 let config = ClientConfig::default()
765 .with_token("test-api-key".to_string());
766 assert_eq!(config.token, Some("test-api-key".to_string()));
767 }
768
769 #[test]
770 fn test_client_config_from_env_missing() {
771 std::env::remove_var(ENV_API_KEY);
773
774 let result = ClientConfig::from_env();
776 assert!(result.is_err());
777 }
778
779 #[test]
780 fn test_client_config_from_env_with_api_key() {
781 std::env::set_var(ENV_API_KEY, "test-api-key-123");
783 std::env::remove_var(ENV_API_URL);
784
785 let config = ClientConfig::from_env().expect("Should have config");
786 assert_eq!(config.token, Some("test-api-key-123".to_string()));
787 assert_eq!(config.api_url, "https://agentlink-api.feedecho.xyz/api/v1");
789 assert_eq!(config.mqtt_broker_url, "mqtts://mqtt.feedecho.xyz:8883");
791
792 std::env::remove_var(ENV_API_KEY);
794 }
795
796 #[test]
797 fn test_client_config_from_env_with_custom_url() {
798 std::env::set_var(ENV_API_KEY, "test-api-key-456");
800 std::env::set_var(ENV_API_URL, "https://api.custom.com");
801
802 let config = ClientConfig::from_env().expect("Should have config");
803 assert_eq!(config.token, Some("test-api-key-456".to_string()));
804 assert_eq!(config.api_url, "https://api.custom.com/api/v1");
806 assert_eq!(config.mqtt_broker_url, "mqtts://mqtt.custom.com:8883");
808
809 std::env::remove_var(ENV_API_KEY);
811 std::env::remove_var(ENV_API_URL);
812 }
813
814 #[tokio::test]
815 async fn test_client_new() {
816 let config = ClientConfig::default();
817 let client = AgentLinkClient::new(config);
818
819 assert!(!client.is_mqtt_connected().await);
820 }
821
822 #[tokio::test]
823 async fn test_client_new_with_token() {
824 let config = ClientConfig::default()
825 .with_token("test-api-key".to_string());
826 let client = AgentLinkClient::new(config);
827
828 assert_eq!(client.get_token(), Some("test-api-key".to_string()));
829 }
830
831 #[test]
832 fn test_client_from_api_key() {
833 let client = AgentLinkClient::from_api_key("my-api-key");
834 assert_eq!(client.get_token(), Some("my-api-key".to_string()));
835 assert_eq!(client.config().api_url, "https://agentlink-api.feedecho.xyz/api/v1");
836 assert_eq!(client.config().mqtt_broker_url, "mqtts://mqtt.feedecho.xyz:8883");
837 }
838
839 #[test]
840 fn test_client_from_api_key_with_urls() {
841 let client = AgentLinkClient::from_api_key_with_urls(
842 "my-api-key",
843 "https://custom.api.com/api/v1",
844 "mqtts://custom.mqtt.com:8883",
845 );
846 assert_eq!(client.get_token(), Some("my-api-key".to_string()));
847 assert_eq!(client.config().api_url, "https://custom.api.com/api/v1");
848 assert_eq!(client.config().mqtt_broker_url, "mqtts://custom.mqtt.com:8883");
849 }
850
851 #[test]
852 fn test_client_from_env() {
853 std::env::set_var(ENV_API_KEY, "env-api-key");
854 std::env::remove_var(ENV_API_URL);
855
856 let client = AgentLinkClient::from_env().expect("Should have client");
857 assert_eq!(client.get_token(), Some("env-api-key".to_string()));
858 assert_eq!(client.config().api_url, "https://agentlink-api.feedecho.xyz/api/v1");
859
860 std::env::remove_var(ENV_API_KEY);
862 }
863}