aria2_ws/
lib.rs

1//!
2//! An aria2 websocket jsonrpc in Rust.
3//!
4//! [aria2 RPC docs](https://aria2.github.io/manual/en/html/aria2c.html#methods)
5//!
6//! ## Features
7//!
8//! - Almost all methods and structed responses
9//! - Auto reconnect
10//! - Ensures `on_complete` and `on_error` hook to be executed even after reconnected.
11//! - Supports notifications
12//!
13//! ## Example
14//!
15//! ```no_run
16//! use std::sync::Arc;
17//!
18//! use aria2_ws::{Client, Callbacks, TaskOptions};
19//! use futures::FutureExt;
20//! use serde_json::json;
21//! use tokio::{spawn, sync::Semaphore};
22//!
23//! #[tokio::main]
24//! async fn main() {
25//!     let client = Client::connect("ws://127.0.0.1:6800/jsonrpc", None)
26//!         .await
27//!         .unwrap();
28//!     let options = TaskOptions {
29//!         split: Some(2),
30//!         header: Some(vec!["Referer: https://www.pixiv.net/".to_string()]),
31//!         all_proxy: Some("http://127.0.0.1:10809".to_string()),
32//!         // Add extra options which are not included in TaskOptions.
33//!         extra_options: json!({"max-download-limit": "200K"})
34//!             .as_object()
35//!             .unwrap()
36//!             .clone(),
37//!         ..Default::default()
38//!     };
39//!
40//!     // use `tokio::sync::Semaphore` to wait for all tasks to finish.
41//!     let semaphore = Arc::new(Semaphore::new(0));
42//!     client
43//!         .add_uri(
44//!             vec![
45//!                 "https://i.pximg.net/img-original/img/2020/05/15/06/56/03/81572512_p0.png"
46//!                     .to_string(),
47//!             ],
48//!             Some(options.clone()),
49//!             None,
50//!             Some(Callbacks {
51//!                 on_download_complete: Some({
52//!                     let s = semaphore.clone();
53//!                     async move {
54//!                         s.add_permits(1);
55//!                         println!("Task 1 completed!");
56//!                     }
57//!                     .boxed()
58//!                 }),
59//!                 on_error: Some({
60//!                     let s = semaphore.clone();
61//!                     async move {
62//!                         s.add_permits(1);
63//!                         println!("Task 1 error!");
64//!                     }
65//!                     .boxed()
66//!                 }),
67//!             }),
68//!         )
69//!         .await
70//!         .unwrap();
71//!
72//!     // Will 404
73//!     client
74//!         .add_uri(
75//!             vec![
76//!                 "https://i.pximg.net/img-original/img/2022/01/05/23/32/16/95326322_p0.pngxxxx"
77//!                     .to_string(),
78//!             ],
79//!             Some(options.clone()),
80//!             None,
81//!             Some(Callbacks {
82//!                 on_download_complete: Some({
83//!                     let s = semaphore.clone();
84//!                     async move {
85//!                         s.add_permits(1);
86//!                         println!("Task 2 completed!");
87//!                     }
88//!                     .boxed()
89//!                 }),
90//!                 on_error: Some({
91//!                     let s = semaphore.clone();
92//!                     async move {
93//!                         s.add_permits(1);
94//!                         println!("Task 2 error!");
95//!                     }
96//!                     .boxed()
97//!                 }),
98//!             }),
99//!         )
100//!         .await
101//!         .unwrap();
102//!
103//!     let mut not = client.subscribe_notifications();
104//!
105//!     spawn(async move {
106//!         while let Ok(msg) = not.recv().await {
107//!             println!("Received notification {:?}", &msg);
108//!         }
109//!     });
110//!
111//!     // Wait for 2 tasks to finish.
112//!     let _ = semaphore.acquire_many(2).await.unwrap();
113//!
114//!     client.shutdown().await.unwrap();
115//! }
116//!
117//! ```
118
119mod callback;
120mod client;
121mod error;
122mod method;
123mod options;
124pub mod response;
125mod utils;
126
127pub use error::Error;
128pub use options::TaskOptions;
129// Re-export `Map` for `TaskOptions`.
130pub use callback::Callbacks;
131pub use client::{Client, InnerClient};
132pub use serde_json::Map;
133
134use serde::{Deserialize, Serialize};
135use serde_json::Value;
136use snafu::OptionExt;
137
138pub(crate) type Result<T> = std::result::Result<T, Error>;
139
140#[derive(Serialize, Deserialize, Debug, Clone)]
141pub struct RpcRequest {
142    pub id: Option<i32>,
143    pub jsonrpc: String,
144    pub method: String,
145    #[serde(default)]
146    pub params: Vec<Value>,
147}
148
149/// Error returned by RPC calls.
150#[derive(Serialize, Deserialize, Debug, Clone)]
151pub struct Aria2Error {
152    pub code: i32,
153    pub message: String,
154}
155impl std::fmt::Display for Aria2Error {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        write!(
158            f,
159            "aria2 responsed error: code {}: {}",
160            self.code, self.message
161        )
162    }
163}
164impl std::error::Error for Aria2Error {}
165
166#[derive(Deserialize, Debug, Clone)]
167pub struct RpcResponse {
168    pub id: Option<i32>,
169    pub jsonrpc: String,
170    pub result: Option<Value>,
171    pub error: Option<Aria2Error>,
172}
173
174/// Events about download progress from aria2.
175#[derive(Debug, Clone, PartialEq, Eq, Copy, Hash)]
176pub enum Event {
177    Start,
178    Pause,
179    Stop,
180    Complete,
181    Error,
182    /// This notification will be sent when a torrent download is complete but seeding is still going on.
183    BtComplete,
184}
185
186impl TryFrom<&str> for Event {
187    type Error = crate::Error;
188
189    fn try_from(value: &str) -> Result<Self> {
190        use Event::*;
191        let event = match value {
192            "aria2.onDownloadStart" => Start,
193            "aria2.onDownloadPause" => Pause,
194            "aria2.onDownloadStop" => Stop,
195            "aria2.onDownloadComplete" => Complete,
196            "aria2.onDownloadError" => Error,
197            "aria2.onBtDownloadComplete" => BtComplete,
198            _ => return error::ParseSnafu { value, to: "Event" }.fail(),
199        };
200        Ok(event)
201    }
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub enum Notification {
206    Aria2 { gid: String, event: Event },
207    WebSocketConnected,
208    WebsocketClosed,
209}
210
211impl TryFrom<&RpcRequest> for Notification {
212    type Error = crate::Error;
213
214    fn try_from(req: &RpcRequest) -> Result<Self> {
215        let gid = (|| req.params.get(0)?.get("gid")?.as_str())()
216            .with_context(|| error::ParseSnafu {
217                value: format!("{:?}", req),
218                to: "Notification",
219            })?
220            .to_string();
221        let event = req.method.as_str().try_into()?;
222        Ok(Notification::Aria2 { gid, event })
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    fn check_if_send<T: Send + Sync>() {}
229
230    #[test]
231    fn t() {
232        check_if_send::<crate::error::Error>();
233        check_if_send::<crate::Client>();
234    }
235}