Skip to main content

walrus_daemon/ext/hub/
mod.rs

1//! Walrus hub — unified registry for all download operations.
2//!
3//! The "hub" encompasses walrus packages, proxied huggingface models,
4//! and future skill downloads. Each operation gets a unique ID, tracked
5//! status, and broadcasts events to subscribers.
6
7use std::{
8    collections::BTreeMap,
9    sync::atomic::{AtomicU64, Ordering},
10};
11use tokio::sync::broadcast;
12use tokio::time::Instant;
13use wcore::protocol::message::{
14    DownloadCompleted, DownloadCreated, DownloadEvent, DownloadFailed, DownloadInfo, DownloadKind,
15    DownloadProgress, DownloadStep, download_event,
16};
17
18pub mod manifest;
19pub mod package;
20
21// ── Registry ──────────────────────────────────────────────────────
22
23/// Download status.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum DownloadStatus {
26    /// Download in progress.
27    Downloading,
28    /// Download completed successfully.
29    Completed,
30    /// Download failed.
31    Failed,
32}
33
34impl std::fmt::Display for DownloadStatus {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::Downloading => write!(f, "downloading"),
38            Self::Completed => write!(f, "completed"),
39            Self::Failed => write!(f, "failed"),
40        }
41    }
42}
43
44/// A tracked download operation.
45pub struct Download {
46    pub id: u64,
47    pub kind: DownloadKind,
48    pub label: String,
49    pub status: DownloadStatus,
50    pub bytes_downloaded: u64,
51    pub total_bytes: u64,
52    pub error: Option<String>,
53    pub created_at: Instant,
54}
55
56/// In-memory download registry with broadcast event channel.
57pub struct DownloadRegistry {
58    downloads: BTreeMap<u64, Download>,
59    next_id: AtomicU64,
60    broadcast: broadcast::Sender<DownloadEvent>,
61}
62
63impl Default for DownloadRegistry {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl DownloadRegistry {
70    /// Create a new registry.
71    pub fn new() -> Self {
72        let (broadcast, _) = broadcast::channel(64);
73        Self {
74            downloads: BTreeMap::new(),
75            next_id: AtomicU64::new(1),
76            broadcast,
77        }
78    }
79
80    /// Subscribe to download lifecycle events.
81    pub fn subscribe(&self) -> broadcast::Receiver<DownloadEvent> {
82        self.broadcast.subscribe()
83    }
84
85    /// Register a new download, returning its ID.
86    pub fn start(&mut self, kind: DownloadKind, label: String) -> u64 {
87        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
88        let download = Download {
89            id,
90            kind,
91            label: label.clone(),
92            status: DownloadStatus::Downloading,
93            bytes_downloaded: 0,
94            total_bytes: 0,
95            error: None,
96            created_at: Instant::now(),
97        };
98        self.downloads.insert(id, download);
99        let _ = self.broadcast.send(DownloadEvent {
100            event: Some(download_event::Event::Created(DownloadCreated {
101                id,
102                kind: kind as i32,
103                label,
104            })),
105        });
106        id
107    }
108
109    /// Report byte-level progress for a download.
110    pub fn progress(&mut self, id: u64, bytes: u64, total_bytes: u64) {
111        if let Some(dl) = self.downloads.get_mut(&id) {
112            dl.bytes_downloaded += bytes;
113            dl.total_bytes = total_bytes;
114        }
115        let _ = self.broadcast.send(DownloadEvent {
116            event: Some(download_event::Event::Progress(DownloadProgress {
117                id,
118                bytes,
119                total_bytes,
120            })),
121        });
122    }
123
124    /// Report a human-readable progress step.
125    pub fn step(&mut self, id: u64, message: String) {
126        let _ = self.broadcast.send(DownloadEvent {
127            event: Some(download_event::Event::Step(DownloadStep { id, message })),
128        });
129    }
130
131    /// Mark a download as completed.
132    pub fn complete(&mut self, id: u64) {
133        if let Some(dl) = self.downloads.get_mut(&id) {
134            dl.status = DownloadStatus::Completed;
135        }
136        let _ = self.broadcast.send(DownloadEvent {
137            event: Some(download_event::Event::Completed(DownloadCompleted { id })),
138        });
139    }
140
141    /// Mark a download as failed.
142    pub fn fail(&mut self, id: u64, error: String) {
143        if let Some(dl) = self.downloads.get_mut(&id) {
144            dl.status = DownloadStatus::Failed;
145            dl.error = Some(error.clone());
146        }
147        let _ = self.broadcast.send(DownloadEvent {
148            event: Some(download_event::Event::Failed(DownloadFailed { id, error })),
149        });
150    }
151
152    /// List all downloads, most recent first.
153    pub fn list(&self) -> Vec<DownloadInfo> {
154        self.downloads
155            .values()
156            .rev()
157            .map(|dl| DownloadInfo {
158                id: dl.id,
159                kind: dl.kind as i32,
160                label: dl.label.clone(),
161                status: dl.status.to_string(),
162                bytes_downloaded: dl.bytes_downloaded,
163                total_bytes: dl.total_bytes,
164                error: dl.error.clone(),
165                alive_secs: dl.created_at.elapsed().as_secs(),
166            })
167            .collect()
168    }
169}