Skip to main content

colab_cli/server/
mod.rs

1pub mod storage;
2
3use std::collections::HashSet;
4
5use chrono::{Duration, Utc};
6use uuid::Uuid;
7
8use crate::client::ColabClient;
9use crate::client::api::{Assignment, Shape, Variant};
10use crate::config::ColabConfig;
11use crate::error::{ColabError, Result};
12use crate::server::storage::{ServerStorage, StoredServer};
13
14pub struct ServerManager {
15    client: ColabClient,
16    storage: ServerStorage,
17}
18
19pub struct AssignOutcome {
20    pub server: StoredServer,
21    pub requested_shape: Shape,
22    pub reported_shape: Option<Shape>,
23    pub shape_mismatch: bool,
24}
25
26impl ServerManager {
27    pub fn new(client: ColabClient, config: &ColabConfig) -> Self {
28        Self {
29            client,
30            storage: ServerStorage::new(config.servers_file()),
31        }
32    }
33
34    pub async fn list(&self) -> Result<(Vec<StoredServer>, usize)> {
35        let live = self.client.list_assignments().await?;
36        let live_endpoints: HashSet<String> = live.iter().map(|a| a.endpoint.clone()).collect();
37        let removed = self.storage.reconcile(&live_endpoints)?;
38
39        for assignment in &live {
40            if let Some(proxy) = &assignment.runtime_proxy_info
41                && let Ok(Some(stored)) = self.storage.get_by_endpoint(&assignment.endpoint)
42            {
43                let updated = StoredServer {
44                    proxy_url: proxy.url.clone(),
45                    proxy_token: proxy.token.clone(),
46                    token_expires_at: Utc::now()
47                        + Duration::seconds(proxy.token_expires_in_seconds),
48                    ..stored
49                };
50                let _ = self.storage.upsert(updated);
51            }
52        }
53
54        let servers = self.storage.list()?;
55        Ok((servers, removed.len()))
56    }
57
58    pub fn list_local(&self) -> Result<Vec<StoredServer>> {
59        self.storage.list()
60    }
61
62    /// Borrow the inner Colab API client. Lets handlers reuse the
63    /// already-built `reqwest::Client` (rustls + http2 + connection
64    /// pool) instead of constructing a fresh one per command, which
65    /// was previously costing ~20-40 ms of cold-handshake setup on
66    /// every short-lived invocation.
67    pub fn client(&self) -> &ColabClient {
68        &self.client
69    }
70
71    pub async fn assign(
72        &self,
73        label: String,
74        variant: Variant,
75        accelerator: Option<String>,
76        shape: Shape,
77    ) -> Result<AssignOutcome> {
78        let notebook_hash = Uuid::new_v4();
79        let (assignment, _is_new) = self
80            .client
81            .assign(notebook_hash, variant, accelerator.as_deref(), shape)
82            .await?;
83
84        let reported = assignment.machine_shape;
85        let stored_shape = reported.unwrap_or(shape);
86        let shape_mismatch = matches!(reported, Some(r) if r != shape);
87
88        let server = self.assignment_to_stored(Uuid::new_v4(), label, &assignment, stored_shape);
89        self.storage.upsert(server.clone())?;
90        Ok(AssignOutcome {
91            server,
92            requested_shape: shape,
93            reported_shape: reported,
94            shape_mismatch,
95        })
96    }
97
98    pub async fn reconfigure(
99        &self,
100        id: Uuid,
101        variant: Variant,
102        accelerator: Option<String>,
103        shape: Shape,
104    ) -> Result<AssignOutcome> {
105        let existing = self
106            .storage
107            .get(id)?
108            .ok_or_else(|| ColabError::ServerNotFound {
109                endpoint: id.to_string(),
110            })?;
111        let label = existing.label.clone();
112        self.remove(id).await?;
113        self.assign(label, variant, accelerator, shape).await
114    }
115
116    pub async fn remove(&self, id: Uuid) -> Result<()> {
117        let server = self
118            .storage
119            .get(id)?
120            .ok_or_else(|| ColabError::ServerNotFound {
121                endpoint: id.to_string(),
122            })?;
123
124        self.storage.remove(id)?;
125
126        if let Ok(sessions) = self.client.list_sessions_via_tunnel(&server.endpoint).await {
127            for session in sessions {
128                let _ = self
129                    .client
130                    .delete_session(&server.proxy_url, &server.proxy_token, &session.id)
131                    .await;
132            }
133        }
134
135        self.client.unassign(&server.endpoint).await
136    }
137
138    pub async fn refresh(&self, id: Uuid) -> Result<StoredServer> {
139        let server = self
140            .storage
141            .get(id)?
142            .ok_or_else(|| ColabError::ServerNotFound {
143                endpoint: id.to_string(),
144            })?;
145
146        let proxy_info = self.client.refresh_connection(&server.endpoint).await?;
147        let updated = StoredServer {
148            proxy_url: proxy_info.url.clone(),
149            proxy_token: proxy_info.token.clone(),
150            token_expires_at: Utc::now() + Duration::seconds(proxy_info.token_expires_in_seconds),
151            ..server
152        };
153        self.storage.upsert(updated.clone())?;
154        Ok(updated)
155    }
156
157    fn assignment_to_stored(
158        &self,
159        id: Uuid,
160        label: String,
161        assignment: &Assignment,
162        shape: Shape,
163    ) -> StoredServer {
164        let proxy = &assignment.runtime_proxy_info;
165        StoredServer {
166            id,
167            label,
168            variant: assignment.variant,
169            accelerator: assignment.accelerator.clone(),
170            shape,
171            endpoint: assignment.endpoint.clone(),
172            proxy_url: proxy.url.clone(),
173            proxy_token: proxy.token.clone(),
174            token_expires_at: Utc::now() + Duration::seconds(proxy.token_expires_in_seconds),
175            date_assigned: Utc::now(),
176        }
177    }
178}