cloudpub_sdk/
connection.rs

1use anyhow::{bail, Result};
2use cloudpub_client::commands::PublishArgs;
3pub use cloudpub_client::config::ClientConfig;
4use cloudpub_client::ping;
5use cloudpub_client::shell::get_cache_dir;
6use cloudpub_common::logging::WorkerGuard;
7use cloudpub_common::protocol::message::Message;
8use cloudpub_common::protocol::{
9    Break, EndpointClear, EndpointList, EndpointRemove, EndpointStart, EndpointStartAll,
10    EndpointStop, ServerEndpoint,
11};
12use parking_lot::RwLock;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{broadcast, mpsc, Mutex};
17use tokio::time::{sleep, timeout};
18use tracing::debug;
19
20use crate::builder::ConnectionBuilder;
21
22/// Псевдоним типа для функции проверки сигналов.
23///
24/// Эта функция вызывается периодически для проверки сигналов прерывания
25/// (например, Ctrl+C в привязках Python). Возвращает ошибку для прерывания операции.
26///
27/// # Пример
28/// ```no_run
29/// use std::sync::Arc;
30/// use anyhow::Result;
31/// use cloudpub_sdk::CheckSignalFn;
32///
33/// fn interrupted() -> bool {
34///     // Ваша логика проверки прерывания здесь
35///     false
36/// }
37///
38/// let check_signal: CheckSignalFn = Arc::new(|| {
39///     if interrupted() {
40///         anyhow::bail!("Операция прервана пользователем")
41///     }
42///     Ok(())
43/// });
44/// ```
45pub type CheckSignalFn = Arc<dyn Fn() -> Result<()> + Send + Sync>;
46
47/// Представляет различные события, происходящие в течение жизненного цикла соединения.
48///
49/// События генерируются асинхронно при изменении состояния соединения или
50/// в ответ на операции. Используйте `Connection::wait_for_event()` для ожидания
51/// конкретных событий.
52///
53/// # Поток событий
54///
55/// 1. `Idle` → Начальное состояние (редко видно пользователям)
56/// 2. `Authenticating` → Когда начинается аутентификация
57/// 3. `Connected` → Успешно аутентифицирован и готов к операциям
58/// 4. `Endpoint` → Ответ на операции с конечной точкой (register, publish)
59/// 5. `List` → Ответ на операции листинга
60/// 6. `Acknowledged` → Операция успешно завершена
61/// 7. `Error` → Операция завершилась с ошибкой
62/// 8. `Closed` → Соединение прервано
63#[derive(Debug, Clone, PartialEq)]
64pub enum ConnectionEvent {
65    /// Начальное состояние перед установлением соединения.
66    /// Это состояние обычно не наблюдается пользователями.
67    #[allow(dead_code)]
68    Idle,
69    /// Идет аутентификация.
70    /// Это событие генерируется, когда клиент начинает аутентификацию с сервером.
71    Authenticating,
72    /// Успешно аутентифицирован и подключен к серверу.
73    /// После этого события соединение готово к операциям.
74    Connected,
75    /// Результат операции конечной точки (register, publish, ping).
76    /// Содержит детали конечной точки, включая ее GUID и URL.
77    Endpoint(Box<ServerEndpoint>),
78    /// Результат операции листинга.
79    /// Содержит все зарегистрированные конечные точки с их текущим статусом.
80    List(Vec<ServerEndpoint>),
81    /// Операция была подтверждена сервером.
82    /// Генерируется для операций, таких как stop, unpublish и clean.
83    Acknowledged,
84    /// Произошла ошибка во время операции.
85    /// Содержит сообщение об ошибке от сервера.
86    Error(String),
87    /// Соединение было закрыто.
88    /// Это может произойти из-за проблем с сетью, отключения сервера или явного разрыва соединения.
89    Closed,
90}
91
92/// Управляет соединением с сервером CloudPub.
93///
94/// Структура `Connection` предоставляет высокоуровневый интерфейс для взаимодействия с
95/// сервером CloudPub, обработки аутентификации, регистрации сервисов и
96/// управления жизненным циклом.
97///
98/// # Пример
99///
100/// ```no_run
101/// # async fn example() -> anyhow::Result<()> {
102/// use cloudpub_sdk::Connection;
103/// use cloudpub_common::protocol::{Protocol, Auth, Endpoint};
104///
105/// // Создание и настройка соединения
106/// let mut conn = Connection::builder()
107///     .credentials("user@example.com", "password")
108///     .timeout_secs(30)
109///     .build()
110///     .await?;
111///
112/// // Регистрация сервиса
113/// let endpoint = conn.publish(
114///     Protocol::Http,
115///     "localhost:8080".to_string(),
116///     Some("My Service".to_string()),
117///     Some(Auth::None),
118///     None,
119///     None,
120///     None,
121/// ).await?;
122///
123/// println!("Сервис доступен по адресу: {}", endpoint.as_url());
124///
125/// // Список всех сервисов
126/// let services = conn.ls().await?;
127/// for service in services {
128///     let name = service.client.as_ref()
129///         .and_then(|c| c.description.clone())
130///         .unwrap_or_else(|| "Безымянный".to_string());
131///     println!("Сервис: {} - {}", name, service.as_url());
132/// }
133/// # Ok(())
134/// # }
135/// ```
136///
137/// # Потокобезопасность
138///
139/// `Connection` использует внутреннюю синхронизацию и может быть безопасно разделен между
140/// потоками с использованием `Arc<Mutex<Connection>>` при необходимости.
141pub struct Connection {
142    pub config: Arc<RwLock<ClientConfig>>,
143    pub command_tx: mpsc::Sender<Message>,
144    pub(crate) event_tx: broadcast::Sender<ConnectionEvent>,
145    pub(crate) receiver_handle: Option<tokio::task::JoinHandle<()>>,
146    pub(crate) timeout: Arc<Mutex<Duration>>,
147    pub(crate) check_signal_fn: Option<CheckSignalFn>,
148    pub(crate) _worker_guard: WorkerGuard,
149    pub(crate) client_handle: Option<tokio::task::JoinHandle<()>>,
150}
151
152impl Connection {
153    /// Создает новый построитель соединения для настройки параметров соединения.
154    ///
155    /// Это рекомендуемый способ создания нового соединения.
156    ///
157    /// # Пример
158    ///
159    /// ```no_run
160    /// # async fn example() -> anyhow::Result<()> {
161    /// use cloudpub_sdk::Connection;
162    ///
163    /// let conn = Connection::builder()
164    ///     .config_path("/path/to/config.toml")
165    ///     .credentials("user@example.com", "password")
166    ///     .timeout_secs(30)
167    ///     .verbose(true)
168    ///     .build()
169    ///     .await?;
170    /// # Ok(())
171    /// # }
172    /// ```
173    pub fn builder() -> ConnectionBuilder {
174        ConnectionBuilder::new()
175    }
176
177    /// Создает новое соединение с управлением событиями
178    pub(crate) fn new(
179        config: Arc<RwLock<ClientConfig>>,
180        command_tx: mpsc::Sender<Message>,
181        mut result_rx: mpsc::Receiver<Message>,
182        timeout: Duration,
183        check_signal_fn: Option<CheckSignalFn>,
184        worker_guard: WorkerGuard,
185        client_handle: Option<tokio::task::JoinHandle<()>>,
186    ) -> Self {
187        // Create broadcast channel for event changes with a reasonable buffer
188        let (event_tx, _) = broadcast::channel(100);
189        let event_tx_clone = event_tx.clone();
190
191        // Start the receiver loop in a separate task
192        let command_tx_clone = command_tx.clone();
193        let receiver_handle = tokio::spawn(async move {
194            while let Some(msg) = result_rx.recv().await {
195                debug!("Received message: {:?}", msg);
196                let new_event = match msg {
197                    Message::ConnectState(st) => {
198                        if st == cloudpub_common::protocol::ConnectState::Connected as i32 {
199                            command_tx_clone
200                                .send(Message::EndpointStartAll(EndpointStartAll {}))
201                                .await
202                                .ok();
203                            ConnectionEvent::Connected
204                        } else if st == cloudpub_common::protocol::ConnectState::Disconnected as i32
205                        {
206                            ConnectionEvent::Closed
207                        } else {
208                            continue;
209                        }
210                    }
211                    Message::EndpointAck(endpoint) => ConnectionEvent::Endpoint(Box::new(endpoint)),
212                    Message::EndpointListAck(list) => ConnectionEvent::List(list.endpoints),
213                    Message::EndpointStopAck(_)
214                    | Message::EndpointRemoveAck(_)
215                    | Message::EndpointClearAck(_) => ConnectionEvent::Acknowledged,
216                    Message::Error(err) => ConnectionEvent::Error(err.message),
217                    Message::Break(_) => ConnectionEvent::Closed,
218                    _ => continue, // Игнорировать другие сообщения
219                };
220
221                // Отправить событие через широковещательный канал
222                debug!("New event: {:?}", new_event);
223                // Игнорировать ошибки отправки (нет получателей)
224                let _ = event_tx_clone.send(new_event);
225            }
226        });
227
228        Connection {
229            config,
230            command_tx,
231            event_tx,
232            receiver_handle: Some(receiver_handle),
233            timeout: Arc::new(Mutex::new(timeout)),
234            check_signal_fn,
235            _worker_guard: worker_guard,
236            client_handle,
237        }
238    }
239
240    /// Ожидает наступления определенного события с таймаутом.
241    ///
242    /// Этот метод блокируется до получения целевого события или истечения таймаута.
243    /// Он используется внутренне другими методами, но также может использоваться напрямую для
244    /// пользовательской обработки событий.
245    ///
246    /// # Аргументы
247    ///
248    /// * `target_event` - Предикатная функция, которая возвращает true при получении желаемого события
249    ///
250    /// # Возвращает
251    ///
252    /// Возвращает соответствующее событие или ошибку, если:
253    /// - Истек таймаут
254    /// - Соединение закрыто
255    /// - Получено событие ошибки
256    ///
257    /// # Пример
258    ///
259    /// ```no_run
260    /// # async fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
261    /// use cloudpub_sdk::ConnectionEvent;
262    ///
263    /// // Ожидать установки соединения
264    /// let event = conn.wait_for_event(|e| matches!(e, ConnectionEvent::Connected)).await?;
265    ///
266    /// // Ожидать любой операции с эндпоинтом
267    /// let event = conn.wait_for_event(|e| matches!(e, ConnectionEvent::Endpoint(_))).await?;
268    /// if let ConnectionEvent::Endpoint(endpoint) = event {
269    ///     println!("Эндпоинт зарегистрирован: {}", endpoint.guid);
270    /// }
271    /// # Ok(())
272    /// # }
273    /// ```
274    pub async fn wait_for_event(
275        &self,
276        target_event: impl Fn(&ConnectionEvent) -> bool + Send,
277    ) -> Result<ConnectionEvent> {
278        let timeout_duration = *self.timeout.lock().await;
279        let check_signal = self.check_signal_fn.clone();
280
281        // Создать нового подписчика для каждого вызова wait_for_event
282        let mut event_rx = self.event_tx.subscribe();
283
284        timeout(timeout_duration, async {
285            loop {
286                // Попытаться получить изменения событий с опциональной проверкой сигнала
287                let current_event = if let Some(ref check_fn) = check_signal {
288                    // Использовать tokio::select для ожидания изменения события или таймаута для проверки сигнала
289                    tokio::select! {
290                        Ok(event) = event_rx.recv() => {
291                            debug!("Received event: {:?}", event);
292                            event
293                        }
294                        _ = sleep(Duration::from_millis(100)) => {
295                            // Проверять сигнал каждые 100мс
296                            check_fn()?;
297                            // После проверки сигнала продолжить ожидание
298                            continue;
299                        }
300                    }
301                } else {
302                    // Без проверки сигнала, просто ожидать событие
303                    match event_rx.recv().await {
304                        Ok(event) => {
305                            debug!("Received event: {:?}", event);
306                            event
307                        }
308                        Err(broadcast::error::RecvError::Closed) => {
309                            bail!("Канал событий закрыт");
310                        }
311                        Err(broadcast::error::RecvError::Lagged(_)) => {
312                            // Некоторые сообщения были потеряны, продолжить
313                            continue;
314                        }
315                    }
316                };
317
318                // Проверить, находимся ли мы в событии ошибки
319                if let ConnectionEvent::Error(ref msg) = current_event {
320                    bail!("Операция не удалась: {}", msg);
321                }
322
323                // Проверить, находимся ли мы в событии закрытия
324                if current_event == ConnectionEvent::Closed {
325                    bail!("Соединение закрыто");
326                }
327
328                // Проверить, достигли ли мы целевого события
329                if target_event(&current_event) {
330                    return Ok(current_event);
331                }
332
333                // Событие не соответствует, продолжить чтение из канала
334                debug!("Событие не соответствует целевому, продолжаем...");
335            }
336        })
337        .await
338        .map_err(|_| anyhow::anyhow!("Таймаут ожидания события"))?
339    }
340
341    /// Устанавливает значение конфигурации.
342    ///
343    /// # Аргументы
344    ///
345    /// * `key` - Ключ конфигурации (например, "server", "port", "ssl")
346    /// * `value` - Значение конфигурации
347    ///
348    /// # Пример
349    ///
350    /// ```no_run
351    /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
352    /// conn.set("server", "api.example.com")?;
353    /// conn.set("port", "443")?;
354    /// conn.set("ssl", "true")?;
355    /// # Ok(())
356    /// # }
357    /// ```
358    pub fn set(&self, key: &str, value: &str) -> Result<()> {
359        self.config.write().set(key, value)?;
360        Ok(())
361    }
362
363    /// Получает значение конфигурации.
364    ///
365    /// # Аргументы
366    ///
367    /// * `key` - Ключ конфигурации для получения
368    ///
369    /// # Возвращает
370    ///
371    /// Значение конфигурации или ошибку, если ключ не существует.
372    ///
373    /// # Пример
374    ///
375    /// ```no_run
376    /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
377    /// let server = conn.get("server")?;
378    /// println!("Подключено к серверу: {}", server);
379    /// # Ok(())
380    /// # }
381    /// ```
382    pub fn get(&self, key: &str) -> Result<String> {
383        self.config.read().get(key)
384    }
385
386    /// Получает все опции конфигурации в виде HashMap.
387    ///
388    /// # Возвращает
389    ///
390    /// HashMap, содержащий все текущие пары ключ-значение конфигурации.
391    ///
392    /// # Пример
393    ///
394    /// ```no_run
395    /// # fn example(conn: &cloudpub_sdk::Connection) {
396    /// let options = conn.options();
397    /// for (key, value) in options {
398    ///     println!("Config: {} = {}", key, value);
399    /// }
400    /// # }
401    /// ```
402    pub fn options(&self) -> HashMap<String, String> {
403        self.config.read().get_all_options().into_iter().collect()
404    }
405
406    /// Выходит из системы, очищая токен аутентификации.
407    ///
408    /// Это удаляет сохраненный токен аутентификации как из памяти, так и
409    /// из файла конфигурации. Соединение нужно будет повторно аутентифицировать
410    /// для дальнейших операций.
411    ///
412    /// # Пример
413    ///
414    /// ```no_run
415    /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
416    /// conn.logout()?;
417    /// println!("Выход выполнен успешно");
418    /// # Ok(())
419    /// # }
420    /// ```
421    pub fn logout(&self) -> Result<()> {
422        self.config.write().token = None;
423        self.config.write().save()?;
424        Ok(())
425    }
426
427    /// Регистрирует сервис на сервере CloudPub.
428    ///
429    /// Этот метод регистрирует локальный сервис для доступа через CloudPub.
430    /// Сервису будет присвоен уникальный GUID и URL для удаленного доступа.
431    ///
432    /// # Аргументы
433    ///
434    /// * `protocol` - Тип протокола (HTTP, HTTPS, TCP, UDP, WS, WSS, RTSP)
435    /// * `address` - Локальный адрес сервиса. Для RTSP с авторизацией используйте формат: `rtsp://user:pass@host:port/path`
436    /// * `name` - Опциональное человекочитаемое имя для сервиса
437    /// * `auth` - Опциональный метод аутентификации для доступа к сервису
438    /// * `acl` - Опциональный список контроля доступа для фильтрации IP
439    /// * `headers` - Опциональные HTTP заголовки для добавления к ответам
440    /// * `rules` - Опциональные правила фильтрации для фильтрации запросов
441    ///
442    /// # Возвращает
443    ///
444    /// Возвращает `ServerEndpoint`, содержащий:
445    /// - `guid`: Уникальный идентификатор для сервиса
446    /// - URL информацию для доступа к сервису
447    /// - Статус сервиса и метаданные
448    ///
449    /// # Пример
450    ///
451    /// ```no_run
452    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
453    /// use cloudpub_common::protocol::{Protocol, Auth, Endpoint, Acl, Header, FilterRule};
454    ///
455    /// // HTTP service
456    /// let endpoint = conn.register(
457    ///     Protocol::Http,
458    ///     "localhost:3000".to_string(),
459    ///     Some("My Web App".to_string()),
460    ///     Some(Auth::Basic),
461    ///     None,
462    ///     None,
463    ///     None,
464    /// ).await?;
465    ///
466    /// // HTTP service with ACL and headers
467    /// let acl = vec![Acl { user: "admin".to_string(), role: cloudpub_common::protocol::Role::Admin as i32 }];
468    /// let headers = vec![Header { name: "X-Custom".to_string(), value: "test".to_string() }];
469    /// let endpoint = conn.register(
470    ///     Protocol::Http,
471    ///     "localhost:8080".to_string(),
472    ///     Some("API Server".to_string()),
473    ///     Some(Auth::None),
474    ///     Some(acl),
475    ///     Some(headers),
476    ///     None,
477    /// ).await?;
478    ///
479    /// // RTSP service with credentials in URL
480    /// let rtsp_endpoint = conn.register(
481    ///     Protocol::Rtsp,
482    ///     "rtsp://camera:secret@localhost:554/stream".to_string(),
483    ///     Some("Security Camera".to_string()),
484    ///     Some(Auth::None),
485    ///     None,
486    ///     None,
487    ///     None,
488    /// ).await?;
489    ///
490    /// println!("Сервис зарегистрирован по адресу: {}", endpoint.as_url());
491    /// println!("GUID сервиса: {}", endpoint.guid);
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub async fn register(
496        &mut self,
497        protocol: cloudpub_common::protocol::Protocol,
498        address: String,
499        name: Option<String>,
500        auth: Option<cloudpub_common::protocol::Auth>,
501        acl: Option<Vec<cloudpub_common::protocol::Acl>>,
502        headers: Option<Vec<cloudpub_common::protocol::Header>>,
503        rules: Option<Vec<cloudpub_common::protocol::FilterRule>>,
504    ) -> Result<cloudpub_common::protocol::ServerEndpoint> {
505        let publish_args = PublishArgs {
506            protocol,
507            address,
508            username: None,
509            password: None,
510            name,
511            auth,
512            acl: acl.unwrap_or_default(),
513            headers: headers.unwrap_or_default(),
514            rules: rules.unwrap_or_default(),
515        };
516
517        let endpoint_start = publish_args.parse()?;
518
519        self.command_tx
520            .send(Message::EndpointStart(endpoint_start))
521            .await?;
522
523        // Wait for response
524        let event = self
525            .wait_for_event(|event| matches!(event, ConnectionEvent::Endpoint(_)))
526            .await?;
527
528        if let ConnectionEvent::Endpoint(endpoint) = event {
529            Ok(*endpoint)
530        } else {
531            anyhow::bail!("Unexpected state")
532        }
533    }
534
535    /// Публикует сервис на сервере CloudPub.
536    ///
537    /// Это псевдоним для `register()`, который запускает сервис сразу после регистрации
538    ///
539    /// # Аргументы
540    ///
541    /// * `protocol` - Тип протокола (HTTP, HTTPS, TCP, UDP, WS, WSS, RTSP)
542    /// * `address` - Локальный адрес сервиса. Для RTSP с авторизацией используйте формат: `rtsp://user:pass@host:port/path`
543    /// * `name` - Опциональное человекочитаемое имя для сервиса
544    /// * `auth` - Опциональный метод аутентификации для доступа к сервису
545    /// * `acl` - Опциональный список контроля доступа для фильтрации IP
546    /// * `headers` - Опциональные HTTP заголовки для добавления к ответам
547    /// * `rules` - Опциональные правила фильтрации для фильтрации запросов
548    ///
549    /// # Возвращает
550    ///
551    /// Возвращает `ServerEndpoint` с деталями сервиса и URL доступа.
552    ///
553    /// # Пример
554    ///
555    /// ```no_run
556    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
557    /// use cloudpub_common::protocol::{Protocol, Auth, Endpoint, Acl, FilterRule};
558    ///
559    /// // Publish a TCP service
560    /// let endpoint = conn.publish(
561    ///     Protocol::Tcp,
562    ///     "localhost:8080".to_string(),
563    ///     Some("TCP Server".to_string()),
564    ///     Some(Auth::None),
565    ///     None,
566    ///     None,
567    ///     None,
568    /// ).await?;
569    ///
570    /// // Publish HTTP service with ACL
571    /// let acl = vec![Acl { user: "reader".to_string(), role: cloudpub_common::protocol::Role::Reader as i32 }];
572    /// let endpoint = conn.publish(
573    ///     Protocol::Http,
574    ///     "localhost:3000".to_string(),
575    ///     Some("Internal API".to_string()),
576    ///     Some(Auth::Basic),
577    ///     Some(acl),
578    ///     None,
579    ///     None,
580    /// ).await?;
581    ///
582    /// // Publish RTSP with embedded credentials
583    /// let rtsp = conn.publish(
584    ///     Protocol::Rtsp,
585    ///     "rtsp://admin:password@192.168.1.100:554/live/ch0".to_string(),
586    ///     Some("IP Camera".to_string()),
587    ///     Some(Auth::Basic),
588    ///     None,
589    ///     None,
590    ///     None,
591    /// ).await?;
592    ///
593    /// println!("TCP сервис доступен по адресу: {}", endpoint.as_url());
594    /// # Ok(())
595    /// # }
596    /// ```
597    pub async fn publish(
598        &mut self,
599        protocol: cloudpub_common::protocol::Protocol,
600        address: String,
601        name: Option<String>,
602        auth: Option<cloudpub_common::protocol::Auth>,
603        acl: Option<Vec<cloudpub_common::protocol::Acl>>,
604        headers: Option<Vec<cloudpub_common::protocol::Header>>,
605        rules: Option<Vec<cloudpub_common::protocol::FilterRule>>,
606    ) -> Result<cloudpub_common::protocol::ServerEndpoint> {
607        // Same as register for now
608        let endpoint = self
609            .register(protocol, address, name, auth, acl, headers, rules)
610            .await?;
611        self.start(endpoint.guid.clone()).await?;
612        Ok(endpoint)
613    }
614
615    /// Выводит список всех зарегистрированных сервисов.
616    ///
617    /// Возвращает список всех сервисов, в данный момент зарегистрированных на сервере,
618    /// включая их статус, URL и метаданные.
619    ///
620    /// # Возвращает
621    ///
622    /// Вектор структур `ServerEndpoint`, содержащих информацию о сервисах.
623    ///
624    /// # Пример
625    ///
626    /// ```no_run
627    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
628    /// use cloudpub_common::protocol::Endpoint;
629    ///
630    /// let services = conn.ls().await?;
631    ///
632    /// for service in services {
633    ///     let name = service.client.as_ref()
634    ///         .and_then(|c| c.description.clone())
635    ///         .unwrap_or_else(|| "Безымянный".to_string());
636    ///     println!("Сервис: {} ({})" ,
637    ///         name,
638    ///         service.guid
639    ///     );
640    ///     println!("  URL: {}", service.as_url());
641    ///     println!("  Статус: {}", service.status.unwrap_or_else(|| "Неизвестен".to_string()));
642    /// }
643    /// # Ok(())
644    /// # }
645    /// ```
646    pub async fn ls(&mut self) -> Result<Vec<cloudpub_common::protocol::ServerEndpoint>> {
647        self.command_tx
648            .send(Message::EndpointList(EndpointList {}))
649            .await?;
650        let event = self
651            .wait_for_event(|event| matches!(event, ConnectionEvent::List(_)))
652            .await?;
653
654        if let ConnectionEvent::List(list) = event {
655            Ok(list)
656        } else {
657            anyhow::bail!("Unexpected state")
658        }
659    }
660
661    /// Запускает сервис по его GUID.
662    ///
663    /// Запускает ранее зарегистрированный сервис, который мог быть остановлен.
664    /// Это делает сервис снова доступным через его CloudPub URL.
665    ///
666    /// # Аргументы
667    ///
668    /// * `guid` - Уникальный идентификатор сервиса для запуска
669    ///
670    /// # Пример
671    ///
672    /// ```no_run
673    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
674    /// let services = conn.ls().await?;
675    /// if let Some(service) = services.first() {
676    ///     conn.start(service.guid.clone()).await?;
677    ///     println!("Запущен сервис: {}", service.guid);
678    /// }
679    /// # Ok(())
680    /// # }
681    /// ```
682    pub async fn start(&mut self, guid: String) -> Result<()> {
683        self.command_tx
684            .send(Message::EndpointGuidStart(EndpointStart { guid }))
685            .await?;
686        // Wait for response
687        self.wait_for_event(|event| matches!(event, ConnectionEvent::Endpoint(_)))
688            .await?;
689        Ok(())
690    }
691
692    /// Останавливает сервис по его GUID.
693    ///
694    /// Временно останавливает сервис, делая его недоступным через CloudPub.
695    /// Регистрация сервиса сохраняется и может быть перезапущена позже.
696    ///
697    /// # Arguments
698    ///
699    /// * `guid` - Уникальный идентификатор сервиса для остановки
700    ///
701    /// # Пример
702    ///
703    /// ```no_run
704    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
705    /// conn.stop("service-guid-123".to_string()).await?;
706    /// println!("Сервис остановлен");
707    /// # Ok(())
708    /// # }
709    /// ```
710    pub async fn stop(&mut self, guid: String) -> Result<()> {
711        self.command_tx
712            .send(Message::EndpointStop(EndpointStop { guid }))
713            .await?;
714        self.wait_for_event(|event| matches!(event, ConnectionEvent::Acknowledged))
715            .await?;
716        Ok(())
717    }
718
719    /// Отменяет публикацию (удаляет) сервис по его GUID.
720    ///
721    /// Навсегда удаляет регистрацию сервиса с сервера.
722    /// Сервис больше не будет доступен через CloudPub.
723    ///
724    /// # Arguments
725    ///
726    /// * `guid` - Уникальный идентификатор сервиса для отмены публикации
727    ///
728    /// # Пример
729    ///
730    /// ```no_run
731    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
732    /// conn.unpublish("service-guid-123".to_string()).await?;
733    /// println!("Публикация сервиса отменена и удалена");
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub async fn unpublish(&mut self, guid: String) -> Result<()> {
738        self.command_tx
739            .send(Message::EndpointRemove(EndpointRemove { guid }))
740            .await?;
741        self.wait_for_event(|event| matches!(event, ConnectionEvent::Acknowledged))
742            .await?;
743        Ok(())
744    }
745
746    /// Удаляет все зарегистрированные сервисы.
747    ///
748    /// Этот метод удаляет все сервисы, зарегистрированные текущим пользователем.
749    /// Используйте с осторожностью, так как эта операция не может быть отменена.
750    ///
751    /// # Пример
752    ///
753    /// ```no_run
754    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
755    /// conn.clean().await?;
756    /// println!("Все сервисы удалены");
757    ///
758    /// let services = conn.ls().await?;
759    /// assert_eq!(services.len(), 0);
760    /// # Ok(())
761    /// # }
762    /// ```
763    pub async fn clean(&mut self) -> Result<()> {
764        self.command_tx
765            .send(Message::EndpointClear(EndpointClear {}))
766            .await?;
767        self.wait_for_event(|event| matches!(event, ConnectionEvent::Acknowledged))
768            .await?;
769        Ok(())
770    }
771
772    /// Пингует сервер для измерения задержки.
773    ///
774    /// Создает временную конечную точку пинга и измеряет время прохождения
775    /// туда и обратно до сервера. Полезно для проверки работоспособности соединения и задержки.
776    ///
777    /// # Возвращает
778    ///
779    /// Задержку пинга в микросекундах.
780    ///
781    /// # Пример
782    ///
783    /// ```no_run
784    /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
785    /// let latency_us = conn.ping().await?;
786    /// println!("Задержка сервера: {}μs ({:.2}мс)", latency_us, latency_us as f64 / 1000.0);
787    ///
788    /// if latency_us > 100_000 {  // 100ms
789    ///     println!("Предупреждение: Обнаружена высокая задержка");
790    /// }
791    /// # Ok(())
792    /// # }
793    /// ```
794    pub async fn ping(&mut self) -> Result<u64> {
795        ping::publish(self.command_tx.clone()).await?;
796        let event = self
797            .wait_for_event(|event| matches!(event, ConnectionEvent::Endpoint(_)))
798            .await?;
799        let endpoint = if let ConnectionEvent::Endpoint(ep) = event {
800            *ep
801        } else {
802            anyhow::bail!("Unexpected state")
803        };
804        Ok(ping::ping_test(endpoint, true).await?.parse::<u64>()?)
805    }
806
807    /// Очищает локальный каталог кэша.
808    ///
809    /// Удаляет все кэшированные данные, включая временные файлы и логи.
810    /// Это может быть полезно для устранения неполадок или освобождения дискового пространства.
811    ///
812    /// # Пример
813    ///
814    /// ```no_run
815    /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
816    /// conn.purge()?;
817    /// println!("Кэш успешно очищен");
818    /// # Ok(())
819    /// # }
820    /// ```
821    pub fn purge(&self) -> Result<()> {
822        let cache_dir = get_cache_dir("")?;
823        std::fs::remove_dir_all(&cache_dir).ok();
824        Ok(())
825    }
826}
827
828impl Drop for Connection {
829    fn drop(&mut self) {
830        // Send Break message to stop the client gracefully
831        self.command_tx
832            .try_send(Message::Break(Break {
833                guid: String::new(),
834            }))
835            .ok();
836
837        // Cancel the receiver task
838        if let Some(handle) = self.receiver_handle.take() {
839            handle.abort();
840        }
841
842        // Abort the client task if it's still running
843        if let Some(handle) = self.client_handle.take() {
844            handle.abort();
845        }
846    }
847}