cloudpub_sdk/connection.rs
1use anyhow::{bail, Result};
2use cloudpub_client::commands::PublishArgs;
3pub use cloudpub_client::config::ClientConfig;
4use cloudpub_client::ping;
5use cloudpub_client::shell::get_cache_dir;
6use cloudpub_common::logging::WorkerGuard;
7use cloudpub_common::protocol::message::Message;
8use cloudpub_common::protocol::{
9 Break, EndpointClear, EndpointList, EndpointRemove, EndpointStart, EndpointStartAll,
10 EndpointStop, ServerEndpoint,
11};
12use parking_lot::RwLock;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{broadcast, mpsc, Mutex};
17use tokio::time::{sleep, timeout};
18use tracing::debug;
19
20use crate::builder::ConnectionBuilder;
21
22/// Псевдоним типа для функции проверки сигналов.
23///
24/// Эта функция вызывается периодически для проверки сигналов прерывания
25/// (например, Ctrl+C в привязках Python). Возвращает ошибку для прерывания операции.
26///
27/// # Пример
28/// ```no_run
29/// use std::sync::Arc;
30/// use anyhow::Result;
31/// use cloudpub_sdk::CheckSignalFn;
32///
33/// fn interrupted() -> bool {
34/// // Ваша логика проверки прерывания здесь
35/// false
36/// }
37///
38/// let check_signal: CheckSignalFn = Arc::new(|| {
39/// if interrupted() {
40/// anyhow::bail!("Операция прервана пользователем")
41/// }
42/// Ok(())
43/// });
44/// ```
45pub type CheckSignalFn = Arc<dyn Fn() -> Result<()> + Send + Sync>;
46
47/// Представляет различные события, происходящие в течение жизненного цикла соединения.
48///
49/// События генерируются асинхронно при изменении состояния соединения или
50/// в ответ на операции. Используйте `Connection::wait_for_event()` для ожидания
51/// конкретных событий.
52///
53/// # Поток событий
54///
55/// 1. `Idle` → Начальное состояние (редко видно пользователям)
56/// 2. `Authenticating` → Когда начинается аутентификация
57/// 3. `Connected` → Успешно аутентифицирован и готов к операциям
58/// 4. `Endpoint` → Ответ на операции с конечной точкой (register, publish)
59/// 5. `List` → Ответ на операции листинга
60/// 6. `Acknowledged` → Операция успешно завершена
61/// 7. `Error` → Операция завершилась с ошибкой
62/// 8. `Closed` → Соединение прервано
63#[derive(Debug, Clone, PartialEq)]
64pub enum ConnectionEvent {
65 /// Начальное состояние перед установлением соединения.
66 /// Это состояние обычно не наблюдается пользователями.
67 #[allow(dead_code)]
68 Idle,
69 /// Идет аутентификация.
70 /// Это событие генерируется, когда клиент начинает аутентификацию с сервером.
71 Authenticating,
72 /// Успешно аутентифицирован и подключен к серверу.
73 /// После этого события соединение готово к операциям.
74 Connected,
75 /// Результат операции конечной точки (register, publish, ping).
76 /// Содержит детали конечной точки, включая ее GUID и URL.
77 Endpoint(Box<ServerEndpoint>),
78 /// Результат операции листинга.
79 /// Содержит все зарегистрированные конечные точки с их текущим статусом.
80 List(Vec<ServerEndpoint>),
81 /// Операция была подтверждена сервером.
82 /// Генерируется для операций, таких как stop, unpublish и clean.
83 Acknowledged,
84 /// Произошла ошибка во время операции.
85 /// Содержит сообщение об ошибке от сервера.
86 Error(String),
87 /// Соединение было закрыто.
88 /// Это может произойти из-за проблем с сетью, отключения сервера или явного разрыва соединения.
89 Closed,
90}
91
92/// Управляет соединением с сервером CloudPub.
93///
94/// Структура `Connection` предоставляет высокоуровневый интерфейс для взаимодействия с
95/// сервером CloudPub, обработки аутентификации, регистрации сервисов и
96/// управления жизненным циклом.
97///
98/// # Пример
99///
100/// ```no_run
101/// # async fn example() -> anyhow::Result<()> {
102/// use cloudpub_sdk::Connection;
103/// use cloudpub_common::protocol::{Protocol, Auth, Endpoint};
104///
105/// // Создание и настройка соединения
106/// let mut conn = Connection::builder()
107/// .credentials("user@example.com", "password")
108/// .timeout_secs(30)
109/// .build()
110/// .await?;
111///
112/// // Регистрация сервиса
113/// let endpoint = conn.publish(
114/// Protocol::Http,
115/// "localhost:8080".to_string(),
116/// Some("My Service".to_string()),
117/// Some(Auth::None),
118/// None,
119/// None,
120/// None,
121/// ).await?;
122///
123/// println!("Сервис доступен по адресу: {}", endpoint.as_url());
124///
125/// // Список всех сервисов
126/// let services = conn.ls().await?;
127/// for service in services {
128/// let name = service.client.as_ref()
129/// .and_then(|c| c.description.clone())
130/// .unwrap_or_else(|| "Безымянный".to_string());
131/// println!("Сервис: {} - {}", name, service.as_url());
132/// }
133/// # Ok(())
134/// # }
135/// ```
136///
137/// # Потокобезопасность
138///
139/// `Connection` использует внутреннюю синхронизацию и может быть безопасно разделен между
140/// потоками с использованием `Arc<Mutex<Connection>>` при необходимости.
141pub struct Connection {
142 pub config: Arc<RwLock<ClientConfig>>,
143 pub command_tx: mpsc::Sender<Message>,
144 pub(crate) event_tx: broadcast::Sender<ConnectionEvent>,
145 pub(crate) receiver_handle: Option<tokio::task::JoinHandle<()>>,
146 pub(crate) timeout: Arc<Mutex<Duration>>,
147 pub(crate) check_signal_fn: Option<CheckSignalFn>,
148 pub(crate) _worker_guard: WorkerGuard,
149 pub(crate) client_handle: Option<tokio::task::JoinHandle<()>>,
150}
151
152impl Connection {
153 /// Создает новый построитель соединения для настройки параметров соединения.
154 ///
155 /// Это рекомендуемый способ создания нового соединения.
156 ///
157 /// # Пример
158 ///
159 /// ```no_run
160 /// # async fn example() -> anyhow::Result<()> {
161 /// use cloudpub_sdk::Connection;
162 ///
163 /// let conn = Connection::builder()
164 /// .config_path("/path/to/config.toml")
165 /// .credentials("user@example.com", "password")
166 /// .timeout_secs(30)
167 /// .verbose(true)
168 /// .build()
169 /// .await?;
170 /// # Ok(())
171 /// # }
172 /// ```
173 pub fn builder() -> ConnectionBuilder {
174 ConnectionBuilder::new()
175 }
176
177 /// Создает новое соединение с управлением событиями
178 pub(crate) fn new(
179 config: Arc<RwLock<ClientConfig>>,
180 command_tx: mpsc::Sender<Message>,
181 mut result_rx: mpsc::Receiver<Message>,
182 timeout: Duration,
183 check_signal_fn: Option<CheckSignalFn>,
184 worker_guard: WorkerGuard,
185 client_handle: Option<tokio::task::JoinHandle<()>>,
186 ) -> Self {
187 // Create broadcast channel for event changes with a reasonable buffer
188 let (event_tx, _) = broadcast::channel(100);
189 let event_tx_clone = event_tx.clone();
190
191 // Start the receiver loop in a separate task
192 let command_tx_clone = command_tx.clone();
193 let receiver_handle = tokio::spawn(async move {
194 while let Some(msg) = result_rx.recv().await {
195 debug!("Received message: {:?}", msg);
196 let new_event = match msg {
197 Message::ConnectState(st) => {
198 if st == cloudpub_common::protocol::ConnectState::Connected as i32 {
199 command_tx_clone
200 .send(Message::EndpointStartAll(EndpointStartAll {}))
201 .await
202 .ok();
203 ConnectionEvent::Connected
204 } else if st == cloudpub_common::protocol::ConnectState::Disconnected as i32
205 {
206 ConnectionEvent::Closed
207 } else {
208 continue;
209 }
210 }
211 Message::EndpointAck(endpoint) => ConnectionEvent::Endpoint(Box::new(endpoint)),
212 Message::EndpointListAck(list) => ConnectionEvent::List(list.endpoints),
213 Message::EndpointStopAck(_)
214 | Message::EndpointRemoveAck(_)
215 | Message::EndpointClearAck(_) => ConnectionEvent::Acknowledged,
216 Message::Error(err) => ConnectionEvent::Error(err.message),
217 Message::Break(_) => ConnectionEvent::Closed,
218 _ => continue, // Игнорировать другие сообщения
219 };
220
221 // Отправить событие через широковещательный канал
222 debug!("New event: {:?}", new_event);
223 // Игнорировать ошибки отправки (нет получателей)
224 let _ = event_tx_clone.send(new_event);
225 }
226 });
227
228 Connection {
229 config,
230 command_tx,
231 event_tx,
232 receiver_handle: Some(receiver_handle),
233 timeout: Arc::new(Mutex::new(timeout)),
234 check_signal_fn,
235 _worker_guard: worker_guard,
236 client_handle,
237 }
238 }
239
240 /// Ожидает наступления определенного события с таймаутом.
241 ///
242 /// Этот метод блокируется до получения целевого события или истечения таймаута.
243 /// Он используется внутренне другими методами, но также может использоваться напрямую для
244 /// пользовательской обработки событий.
245 ///
246 /// # Аргументы
247 ///
248 /// * `target_event` - Предикатная функция, которая возвращает true при получении желаемого события
249 ///
250 /// # Возвращает
251 ///
252 /// Возвращает соответствующее событие или ошибку, если:
253 /// - Истек таймаут
254 /// - Соединение закрыто
255 /// - Получено событие ошибки
256 ///
257 /// # Пример
258 ///
259 /// ```no_run
260 /// # async fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
261 /// use cloudpub_sdk::ConnectionEvent;
262 ///
263 /// // Ожидать установки соединения
264 /// let event = conn.wait_for_event(|e| matches!(e, ConnectionEvent::Connected)).await?;
265 ///
266 /// // Ожидать любой операции с эндпоинтом
267 /// let event = conn.wait_for_event(|e| matches!(e, ConnectionEvent::Endpoint(_))).await?;
268 /// if let ConnectionEvent::Endpoint(endpoint) = event {
269 /// println!("Эндпоинт зарегистрирован: {}", endpoint.guid);
270 /// }
271 /// # Ok(())
272 /// # }
273 /// ```
274 pub async fn wait_for_event(
275 &self,
276 target_event: impl Fn(&ConnectionEvent) -> bool + Send,
277 ) -> Result<ConnectionEvent> {
278 let timeout_duration = *self.timeout.lock().await;
279 let check_signal = self.check_signal_fn.clone();
280
281 // Создать нового подписчика для каждого вызова wait_for_event
282 let mut event_rx = self.event_tx.subscribe();
283
284 timeout(timeout_duration, async {
285 loop {
286 // Попытаться получить изменения событий с опциональной проверкой сигнала
287 let current_event = if let Some(ref check_fn) = check_signal {
288 // Использовать tokio::select для ожидания изменения события или таймаута для проверки сигнала
289 tokio::select! {
290 Ok(event) = event_rx.recv() => {
291 debug!("Received event: {:?}", event);
292 event
293 }
294 _ = sleep(Duration::from_millis(100)) => {
295 // Проверять сигнал каждые 100мс
296 check_fn()?;
297 // После проверки сигнала продолжить ожидание
298 continue;
299 }
300 }
301 } else {
302 // Без проверки сигнала, просто ожидать событие
303 match event_rx.recv().await {
304 Ok(event) => {
305 debug!("Received event: {:?}", event);
306 event
307 }
308 Err(broadcast::error::RecvError::Closed) => {
309 bail!("Канал событий закрыт");
310 }
311 Err(broadcast::error::RecvError::Lagged(_)) => {
312 // Некоторые сообщения были потеряны, продолжить
313 continue;
314 }
315 }
316 };
317
318 // Проверить, находимся ли мы в событии ошибки
319 if let ConnectionEvent::Error(ref msg) = current_event {
320 bail!("Операция не удалась: {}", msg);
321 }
322
323 // Проверить, находимся ли мы в событии закрытия
324 if current_event == ConnectionEvent::Closed {
325 bail!("Соединение закрыто");
326 }
327
328 // Проверить, достигли ли мы целевого события
329 if target_event(¤t_event) {
330 return Ok(current_event);
331 }
332
333 // Событие не соответствует, продолжить чтение из канала
334 debug!("Событие не соответствует целевому, продолжаем...");
335 }
336 })
337 .await
338 .map_err(|_| anyhow::anyhow!("Таймаут ожидания события"))?
339 }
340
341 /// Устанавливает значение конфигурации.
342 ///
343 /// # Аргументы
344 ///
345 /// * `key` - Ключ конфигурации (например, "server", "port", "ssl")
346 /// * `value` - Значение конфигурации
347 ///
348 /// # Пример
349 ///
350 /// ```no_run
351 /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
352 /// conn.set("server", "api.example.com")?;
353 /// conn.set("port", "443")?;
354 /// conn.set("ssl", "true")?;
355 /// # Ok(())
356 /// # }
357 /// ```
358 pub fn set(&self, key: &str, value: &str) -> Result<()> {
359 self.config.write().set(key, value)?;
360 Ok(())
361 }
362
363 /// Получает значение конфигурации.
364 ///
365 /// # Аргументы
366 ///
367 /// * `key` - Ключ конфигурации для получения
368 ///
369 /// # Возвращает
370 ///
371 /// Значение конфигурации или ошибку, если ключ не существует.
372 ///
373 /// # Пример
374 ///
375 /// ```no_run
376 /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
377 /// let server = conn.get("server")?;
378 /// println!("Подключено к серверу: {}", server);
379 /// # Ok(())
380 /// # }
381 /// ```
382 pub fn get(&self, key: &str) -> Result<String> {
383 self.config.read().get(key)
384 }
385
386 /// Получает все опции конфигурации в виде HashMap.
387 ///
388 /// # Возвращает
389 ///
390 /// HashMap, содержащий все текущие пары ключ-значение конфигурации.
391 ///
392 /// # Пример
393 ///
394 /// ```no_run
395 /// # fn example(conn: &cloudpub_sdk::Connection) {
396 /// let options = conn.options();
397 /// for (key, value) in options {
398 /// println!("Config: {} = {}", key, value);
399 /// }
400 /// # }
401 /// ```
402 pub fn options(&self) -> HashMap<String, String> {
403 self.config.read().get_all_options().into_iter().collect()
404 }
405
406 /// Выходит из системы, очищая токен аутентификации.
407 ///
408 /// Это удаляет сохраненный токен аутентификации как из памяти, так и
409 /// из файла конфигурации. Соединение нужно будет повторно аутентифицировать
410 /// для дальнейших операций.
411 ///
412 /// # Пример
413 ///
414 /// ```no_run
415 /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
416 /// conn.logout()?;
417 /// println!("Выход выполнен успешно");
418 /// # Ok(())
419 /// # }
420 /// ```
421 pub fn logout(&self) -> Result<()> {
422 self.config.write().token = None;
423 self.config.write().save()?;
424 Ok(())
425 }
426
427 /// Регистрирует сервис на сервере CloudPub.
428 ///
429 /// Этот метод регистрирует локальный сервис для доступа через CloudPub.
430 /// Сервису будет присвоен уникальный GUID и URL для удаленного доступа.
431 ///
432 /// # Аргументы
433 ///
434 /// * `protocol` - Тип протокола (HTTP, HTTPS, TCP, UDP, WS, WSS, RTSP)
435 /// * `address` - Локальный адрес сервиса. Для RTSP с авторизацией используйте формат: `rtsp://user:pass@host:port/path`
436 /// * `name` - Опциональное человекочитаемое имя для сервиса
437 /// * `auth` - Опциональный метод аутентификации для доступа к сервису
438 /// * `acl` - Опциональный список контроля доступа для фильтрации IP
439 /// * `headers` - Опциональные HTTP заголовки для добавления к ответам
440 /// * `rules` - Опциональные правила фильтрации для фильтрации запросов
441 ///
442 /// # Возвращает
443 ///
444 /// Возвращает `ServerEndpoint`, содержащий:
445 /// - `guid`: Уникальный идентификатор для сервиса
446 /// - URL информацию для доступа к сервису
447 /// - Статус сервиса и метаданные
448 ///
449 /// # Пример
450 ///
451 /// ```no_run
452 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
453 /// use cloudpub_common::protocol::{Protocol, Auth, Endpoint, Acl, Header, FilterRule};
454 ///
455 /// // HTTP service
456 /// let endpoint = conn.register(
457 /// Protocol::Http,
458 /// "localhost:3000".to_string(),
459 /// Some("My Web App".to_string()),
460 /// Some(Auth::Basic),
461 /// None,
462 /// None,
463 /// None,
464 /// ).await?;
465 ///
466 /// // HTTP service with ACL and headers
467 /// let acl = vec![Acl { user: "admin".to_string(), role: cloudpub_common::protocol::Role::Admin as i32 }];
468 /// let headers = vec![Header { name: "X-Custom".to_string(), value: "test".to_string() }];
469 /// let endpoint = conn.register(
470 /// Protocol::Http,
471 /// "localhost:8080".to_string(),
472 /// Some("API Server".to_string()),
473 /// Some(Auth::None),
474 /// Some(acl),
475 /// Some(headers),
476 /// None,
477 /// ).await?;
478 ///
479 /// // RTSP service with credentials in URL
480 /// let rtsp_endpoint = conn.register(
481 /// Protocol::Rtsp,
482 /// "rtsp://camera:secret@localhost:554/stream".to_string(),
483 /// Some("Security Camera".to_string()),
484 /// Some(Auth::None),
485 /// None,
486 /// None,
487 /// None,
488 /// ).await?;
489 ///
490 /// println!("Сервис зарегистрирован по адресу: {}", endpoint.as_url());
491 /// println!("GUID сервиса: {}", endpoint.guid);
492 /// # Ok(())
493 /// # }
494 /// ```
495 pub async fn register(
496 &mut self,
497 protocol: cloudpub_common::protocol::Protocol,
498 address: String,
499 name: Option<String>,
500 auth: Option<cloudpub_common::protocol::Auth>,
501 acl: Option<Vec<cloudpub_common::protocol::Acl>>,
502 headers: Option<Vec<cloudpub_common::protocol::Header>>,
503 rules: Option<Vec<cloudpub_common::protocol::FilterRule>>,
504 ) -> Result<cloudpub_common::protocol::ServerEndpoint> {
505 let publish_args = PublishArgs {
506 protocol,
507 address,
508 username: None,
509 password: None,
510 name,
511 auth,
512 acl: acl.unwrap_or_default(),
513 headers: headers.unwrap_or_default(),
514 rules: rules.unwrap_or_default(),
515 };
516
517 let endpoint_start = publish_args.parse()?;
518
519 self.command_tx
520 .send(Message::EndpointStart(endpoint_start))
521 .await?;
522
523 // Wait for response
524 let event = self
525 .wait_for_event(|event| matches!(event, ConnectionEvent::Endpoint(_)))
526 .await?;
527
528 if let ConnectionEvent::Endpoint(endpoint) = event {
529 Ok(*endpoint)
530 } else {
531 anyhow::bail!("Unexpected state")
532 }
533 }
534
535 /// Публикует сервис на сервере CloudPub.
536 ///
537 /// Это псевдоним для `register()`, который запускает сервис сразу после регистрации
538 ///
539 /// # Аргументы
540 ///
541 /// * `protocol` - Тип протокола (HTTP, HTTPS, TCP, UDP, WS, WSS, RTSP)
542 /// * `address` - Локальный адрес сервиса. Для RTSP с авторизацией используйте формат: `rtsp://user:pass@host:port/path`
543 /// * `name` - Опциональное человекочитаемое имя для сервиса
544 /// * `auth` - Опциональный метод аутентификации для доступа к сервису
545 /// * `acl` - Опциональный список контроля доступа для фильтрации IP
546 /// * `headers` - Опциональные HTTP заголовки для добавления к ответам
547 /// * `rules` - Опциональные правила фильтрации для фильтрации запросов
548 ///
549 /// # Возвращает
550 ///
551 /// Возвращает `ServerEndpoint` с деталями сервиса и URL доступа.
552 ///
553 /// # Пример
554 ///
555 /// ```no_run
556 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
557 /// use cloudpub_common::protocol::{Protocol, Auth, Endpoint, Acl, FilterRule};
558 ///
559 /// // Publish a TCP service
560 /// let endpoint = conn.publish(
561 /// Protocol::Tcp,
562 /// "localhost:8080".to_string(),
563 /// Some("TCP Server".to_string()),
564 /// Some(Auth::None),
565 /// None,
566 /// None,
567 /// None,
568 /// ).await?;
569 ///
570 /// // Publish HTTP service with ACL
571 /// let acl = vec![Acl { user: "reader".to_string(), role: cloudpub_common::protocol::Role::Reader as i32 }];
572 /// let endpoint = conn.publish(
573 /// Protocol::Http,
574 /// "localhost:3000".to_string(),
575 /// Some("Internal API".to_string()),
576 /// Some(Auth::Basic),
577 /// Some(acl),
578 /// None,
579 /// None,
580 /// ).await?;
581 ///
582 /// // Publish RTSP with embedded credentials
583 /// let rtsp = conn.publish(
584 /// Protocol::Rtsp,
585 /// "rtsp://admin:password@192.168.1.100:554/live/ch0".to_string(),
586 /// Some("IP Camera".to_string()),
587 /// Some(Auth::Basic),
588 /// None,
589 /// None,
590 /// None,
591 /// ).await?;
592 ///
593 /// println!("TCP сервис доступен по адресу: {}", endpoint.as_url());
594 /// # Ok(())
595 /// # }
596 /// ```
597 pub async fn publish(
598 &mut self,
599 protocol: cloudpub_common::protocol::Protocol,
600 address: String,
601 name: Option<String>,
602 auth: Option<cloudpub_common::protocol::Auth>,
603 acl: Option<Vec<cloudpub_common::protocol::Acl>>,
604 headers: Option<Vec<cloudpub_common::protocol::Header>>,
605 rules: Option<Vec<cloudpub_common::protocol::FilterRule>>,
606 ) -> Result<cloudpub_common::protocol::ServerEndpoint> {
607 // Same as register for now
608 let endpoint = self
609 .register(protocol, address, name, auth, acl, headers, rules)
610 .await?;
611 self.start(endpoint.guid.clone()).await?;
612 Ok(endpoint)
613 }
614
615 /// Выводит список всех зарегистрированных сервисов.
616 ///
617 /// Возвращает список всех сервисов, в данный момент зарегистрированных на сервере,
618 /// включая их статус, URL и метаданные.
619 ///
620 /// # Возвращает
621 ///
622 /// Вектор структур `ServerEndpoint`, содержащих информацию о сервисах.
623 ///
624 /// # Пример
625 ///
626 /// ```no_run
627 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
628 /// use cloudpub_common::protocol::Endpoint;
629 ///
630 /// let services = conn.ls().await?;
631 ///
632 /// for service in services {
633 /// let name = service.client.as_ref()
634 /// .and_then(|c| c.description.clone())
635 /// .unwrap_or_else(|| "Безымянный".to_string());
636 /// println!("Сервис: {} ({})" ,
637 /// name,
638 /// service.guid
639 /// );
640 /// println!(" URL: {}", service.as_url());
641 /// println!(" Статус: {}", service.status.unwrap_or_else(|| "Неизвестен".to_string()));
642 /// }
643 /// # Ok(())
644 /// # }
645 /// ```
646 pub async fn ls(&mut self) -> Result<Vec<cloudpub_common::protocol::ServerEndpoint>> {
647 self.command_tx
648 .send(Message::EndpointList(EndpointList {}))
649 .await?;
650 let event = self
651 .wait_for_event(|event| matches!(event, ConnectionEvent::List(_)))
652 .await?;
653
654 if let ConnectionEvent::List(list) = event {
655 Ok(list)
656 } else {
657 anyhow::bail!("Unexpected state")
658 }
659 }
660
661 /// Запускает сервис по его GUID.
662 ///
663 /// Запускает ранее зарегистрированный сервис, который мог быть остановлен.
664 /// Это делает сервис снова доступным через его CloudPub URL.
665 ///
666 /// # Аргументы
667 ///
668 /// * `guid` - Уникальный идентификатор сервиса для запуска
669 ///
670 /// # Пример
671 ///
672 /// ```no_run
673 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
674 /// let services = conn.ls().await?;
675 /// if let Some(service) = services.first() {
676 /// conn.start(service.guid.clone()).await?;
677 /// println!("Запущен сервис: {}", service.guid);
678 /// }
679 /// # Ok(())
680 /// # }
681 /// ```
682 pub async fn start(&mut self, guid: String) -> Result<()> {
683 self.command_tx
684 .send(Message::EndpointGuidStart(EndpointStart { guid }))
685 .await?;
686 // Wait for response
687 self.wait_for_event(|event| matches!(event, ConnectionEvent::Endpoint(_)))
688 .await?;
689 Ok(())
690 }
691
692 /// Останавливает сервис по его GUID.
693 ///
694 /// Временно останавливает сервис, делая его недоступным через CloudPub.
695 /// Регистрация сервиса сохраняется и может быть перезапущена позже.
696 ///
697 /// # Arguments
698 ///
699 /// * `guid` - Уникальный идентификатор сервиса для остановки
700 ///
701 /// # Пример
702 ///
703 /// ```no_run
704 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
705 /// conn.stop("service-guid-123".to_string()).await?;
706 /// println!("Сервис остановлен");
707 /// # Ok(())
708 /// # }
709 /// ```
710 pub async fn stop(&mut self, guid: String) -> Result<()> {
711 self.command_tx
712 .send(Message::EndpointStop(EndpointStop { guid }))
713 .await?;
714 self.wait_for_event(|event| matches!(event, ConnectionEvent::Acknowledged))
715 .await?;
716 Ok(())
717 }
718
719 /// Отменяет публикацию (удаляет) сервис по его GUID.
720 ///
721 /// Навсегда удаляет регистрацию сервиса с сервера.
722 /// Сервис больше не будет доступен через CloudPub.
723 ///
724 /// # Arguments
725 ///
726 /// * `guid` - Уникальный идентификатор сервиса для отмены публикации
727 ///
728 /// # Пример
729 ///
730 /// ```no_run
731 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
732 /// conn.unpublish("service-guid-123".to_string()).await?;
733 /// println!("Публикация сервиса отменена и удалена");
734 /// # Ok(())
735 /// # }
736 /// ```
737 pub async fn unpublish(&mut self, guid: String) -> Result<()> {
738 self.command_tx
739 .send(Message::EndpointRemove(EndpointRemove { guid }))
740 .await?;
741 self.wait_for_event(|event| matches!(event, ConnectionEvent::Acknowledged))
742 .await?;
743 Ok(())
744 }
745
746 /// Удаляет все зарегистрированные сервисы.
747 ///
748 /// Этот метод удаляет все сервисы, зарегистрированные текущим пользователем.
749 /// Используйте с осторожностью, так как эта операция не может быть отменена.
750 ///
751 /// # Пример
752 ///
753 /// ```no_run
754 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
755 /// conn.clean().await?;
756 /// println!("Все сервисы удалены");
757 ///
758 /// let services = conn.ls().await?;
759 /// assert_eq!(services.len(), 0);
760 /// # Ok(())
761 /// # }
762 /// ```
763 pub async fn clean(&mut self) -> Result<()> {
764 self.command_tx
765 .send(Message::EndpointClear(EndpointClear {}))
766 .await?;
767 self.wait_for_event(|event| matches!(event, ConnectionEvent::Acknowledged))
768 .await?;
769 Ok(())
770 }
771
772 /// Пингует сервер для измерения задержки.
773 ///
774 /// Создает временную конечную точку пинга и измеряет время прохождения
775 /// туда и обратно до сервера. Полезно для проверки работоспособности соединения и задержки.
776 ///
777 /// # Возвращает
778 ///
779 /// Задержку пинга в микросекундах.
780 ///
781 /// # Пример
782 ///
783 /// ```no_run
784 /// # async fn example(conn: &mut cloudpub_sdk::Connection) -> anyhow::Result<()> {
785 /// let latency_us = conn.ping().await?;
786 /// println!("Задержка сервера: {}μs ({:.2}мс)", latency_us, latency_us as f64 / 1000.0);
787 ///
788 /// if latency_us > 100_000 { // 100ms
789 /// println!("Предупреждение: Обнаружена высокая задержка");
790 /// }
791 /// # Ok(())
792 /// # }
793 /// ```
794 pub async fn ping(&mut self) -> Result<u64> {
795 ping::publish(self.command_tx.clone()).await?;
796 let event = self
797 .wait_for_event(|event| matches!(event, ConnectionEvent::Endpoint(_)))
798 .await?;
799 let endpoint = if let ConnectionEvent::Endpoint(ep) = event {
800 *ep
801 } else {
802 anyhow::bail!("Unexpected state")
803 };
804 Ok(ping::ping_test(endpoint, true).await?.parse::<u64>()?)
805 }
806
807 /// Очищает локальный каталог кэша.
808 ///
809 /// Удаляет все кэшированные данные, включая временные файлы и логи.
810 /// Это может быть полезно для устранения неполадок или освобождения дискового пространства.
811 ///
812 /// # Пример
813 ///
814 /// ```no_run
815 /// # fn example(conn: &cloudpub_sdk::Connection) -> anyhow::Result<()> {
816 /// conn.purge()?;
817 /// println!("Кэш успешно очищен");
818 /// # Ok(())
819 /// # }
820 /// ```
821 pub fn purge(&self) -> Result<()> {
822 let cache_dir = get_cache_dir("")?;
823 std::fs::remove_dir_all(&cache_dir).ok();
824 Ok(())
825 }
826}
827
828impl Drop for Connection {
829 fn drop(&mut self) {
830 // Send Break message to stop the client gracefully
831 self.command_tx
832 .try_send(Message::Break(Break {
833 guid: String::new(),
834 }))
835 .ok();
836
837 // Cancel the receiver task
838 if let Some(handle) = self.receiver_handle.take() {
839 handle.abort();
840 }
841
842 // Abort the client task if it's still running
843 if let Some(handle) = self.client_handle.take() {
844 handle.abort();
845 }
846 }
847}