urbit_http_api/
channel.rs

1use crate::apps::chat::Chat;
2use crate::apps::collections::Collection;
3use crate::apps::notebook::Notebook;
4use crate::error::{Result, UrbitAPIError};
5use crate::graphstore::GraphStore;
6use crate::interface::ShipInterface;
7use crate::subscription::{CreationID, Subscription};
8use eventsource_threaded::{EventSource, ReceiverSource};
9use json::{object, JsonValue};
10use rand::Rng;
11use reqwest::blocking::Response;
12use reqwest::header::HeaderMap;
13use reqwest::Url;
14use std::time::SystemTime;
15
16/// A Channel which is used to interact with a ship
17#[derive(Debug)]
18pub struct Channel {
19    /// `ShipInterface` this channel is created from
20    pub ship_interface: ShipInterface,
21    /// The uid of the channel
22    pub uid: String,
23    /// The url of the channel
24    pub url: String,
25    // The list of `Subscription`s for this channel
26    pub subscription_list: Vec<Subscription>,
27    // / The `EventSource` for this channel which reads all of
28    // / the SSE events.
29    event_receiver: ReceiverSource,
30    /// The current number of messages that have been sent out (which are
31    /// also defined as message ids) via this `Channel`
32    pub message_id_count: u64,
33}
34
35/// Channel methods for basic functionality
36impl Channel {
37    /// Create a new channel
38    pub fn new(ship_interface: ShipInterface) -> Result<Channel> {
39        let mut rng = rand::thread_rng();
40        // Defining the uid as UNIX time, or random if error
41        let uid = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
42            Ok(n) => n.as_micros(),
43            Err(_) => rng.gen(),
44        }
45        .to_string();
46
47        // Channel url
48        let channel_url = format!("{}/~/channel/{}", &ship_interface.url, uid);
49        // Opening channel request json
50        let mut body = json::parse(r#"[]"#).unwrap();
51        body[0] = object! {
52                "id": 1,
53                "action": "poke",
54                "ship": ship_interface.ship_name.clone(),
55                "app": "hood",
56                "mark": "helm-hi",
57                "json": "Opening channel",
58        };
59
60        // Make the put request to create the channel.
61        let resp = ship_interface.send_put_request(&channel_url, &body)?;
62
63        if resp.status().as_u16() == 204 {
64            // Create cookie header with the ship session auth val
65            let mut headers = HeaderMap::new();
66            headers.append("cookie", ship_interface.session_auth.clone());
67            // Create the receiver
68            let url_structured =
69                Url::parse(&channel_url).map_err(|_| UrbitAPIError::FailedToCreateNewChannel)?;
70            let receiver = EventSource::new(url_structured, headers);
71
72            return Ok(Channel {
73                ship_interface: ship_interface,
74                uid: uid,
75                url: channel_url,
76                subscription_list: vec![],
77                event_receiver: receiver,
78                message_id_count: 2,
79            });
80        } else {
81            return Err(UrbitAPIError::FailedToCreateNewChannel);
82        }
83    }
84
85    /// Sends a poke over the channel
86    pub fn poke(&mut self, app: &str, mark: &str, json: &JsonValue) -> Result<Response> {
87        let mut body = json::parse(r#"[]"#).unwrap();
88        body[0] = object! {
89                "id": self.get_and_raise_message_id_count(),
90                "action": "poke",
91                "ship": self.ship_interface.ship_name.clone(),
92                "app": app,
93                "mark": mark,
94                "json": json.clone(),
95        };
96
97        // Make the put request for the poke
98        self.ship_interface.send_put_request(&self.url, &body)
99    }
100
101    /// Sends a scry to the ship
102    pub fn scry(&self, app: &str, path: &str, mark: &str) -> Result<Response> {
103        self.ship_interface.scry(app, path, mark)
104    }
105
106    /// Run a thread via spider
107    pub fn spider(
108        &self,
109        input_mark: &str,
110        output_mark: &str,
111        thread_name: &str,
112        body: &JsonValue,
113    ) -> Result<Response> {
114        self.ship_interface
115            .spider(input_mark, output_mark, thread_name, body)
116    }
117
118    /// Create a new `Subscription` and thus subscribes to events on the
119    /// ship with the provided app/path.
120    pub fn create_new_subscription(&mut self, app: &str, path: &str) -> Result<CreationID> {
121        // Saves the message id to be reused
122        let creation_id = self.get_and_raise_message_id_count();
123        // Create the json body
124        let mut body = json::parse(r#"[]"#).unwrap();
125        body[0] = object! {
126                "id": creation_id,
127                "action": "subscribe",
128                "ship": self.ship_interface.ship_name.clone(),
129                "app": app.to_string(),
130                "path": path.to_string(),
131        };
132
133        // Make the put request to create the channel.
134        let resp = self.ship_interface.send_put_request(&self.url, &body)?;
135
136        if resp.status().as_u16() == 204 {
137            // Create the `Subscription`
138            let sub = Subscription {
139                channel_uid: self.uid.clone(),
140                creation_id: creation_id,
141                app: app.to_string(),
142                path: path.to_string(),
143                message_list: vec![],
144            };
145            // Add the `Subscription` to the list
146            self.subscription_list.push(sub.clone());
147            return Ok(creation_id);
148        } else {
149            return Err(UrbitAPIError::FailedToCreateNewSubscription);
150        }
151    }
152
153    /// Parses SSE messages for this channel and moves them into
154    /// the proper corresponding `Subscription`'s `message_list`.
155    pub fn parse_event_messages(&mut self) {
156        let rec = &mut self.event_receiver;
157
158        // Consume all messages
159        loop {
160            if let Ok(event_res) = rec.try_recv() {
161                if let Err(e) = &event_res {
162                    println!("Error Event: {}", e);
163                }
164                if let Ok(event) = event_res {
165                    // Go through all subscriptions and find which
166                    // subscription this event is for.
167                    for sub in &mut self.subscription_list {
168                        // If adding the message succeeded (because found
169                        // correct `Subscription`) then stop.
170                        if let Some(_) = sub.add_to_message_list(&event) {
171                            // Send an ack for the processed event
172                            // Using unwrap because `add_to_message_list`
173                            // already does error checking.
174                            let eid: u64 = event.id.unwrap().parse().unwrap();
175                            let mut json = json::parse(r#"[]"#).unwrap();
176                            json[0] = object! {
177                                "id": self.message_id_count,
178                                "action": "ack",
179                                "event-id": eid,
180                            };
181                            self.message_id_count += 1;
182                            let _ack_res = self.ship_interface.send_put_request(&self.url, &json);
183                            break;
184                        }
185                    }
186                }
187                continue;
188            }
189            break;
190        }
191    }
192
193    /// Finds the first `Subscription` in the list which has a matching
194    /// `app` and `path`;
195    pub fn find_subscription(&mut self, app: &str, path: &str) -> Option<&mut Subscription> {
196        for sub in &mut self.subscription_list {
197            if sub.app == app && sub.path == path {
198                return Some(sub);
199            }
200        }
201        None
202    }
203
204    /// Finds the first `Subscription` in the list which has a matching
205    /// `app` and `path`, removes it from the list, and tells the ship
206    /// that you are unsubscribing. Returns `None` if failed to find
207    /// a subscription with a matching app & path.
208    pub fn unsubscribe(&mut self, app: &str, path: &str) -> Option<bool> {
209        let index = self
210            .subscription_list
211            .iter()
212            .position(|s| s.app == app && s.path == path)?;
213        self.subscription_list.remove(index);
214        Some(true)
215    }
216
217    /// Deletes the channel
218    pub fn delete_channel(self) {
219        let mut json = json::parse(r#"[]"#).unwrap();
220        json[0] = object! {
221            "id": self.message_id_count,
222            "action": "delete",
223        };
224        let _res = self.ship_interface.send_put_request(&self.url, &json);
225        std::mem::drop(self);
226    }
227
228    /// Acquires and returns the current `message_id_count` from the
229    /// `ShipInterface` that this channel was created from while also
230    /// increase said value by 1.
231    fn get_and_raise_message_id_count(&mut self) -> u64 {
232        let current_id_count = self.message_id_count;
233        self.message_id_count += 1;
234        current_id_count
235    }
236}
237
238/// `Channel` methods which expose advanced functionality, typically by
239/// producing another struct which is built on top of `Channel`.
240impl Channel {
241    /// Create a `Chat` struct which exposes an interface for interacting
242    /// with chats on Urbit
243    pub fn chat(&mut self) -> Chat {
244        Chat { channel: self }
245    }
246
247    /// Create a `Notebook` struct which exposes an interface for interacting
248    /// with notebooks on Urbit
249    pub fn notebook(&mut self) -> Notebook {
250        Notebook { channel: self }
251    }
252
253    /// Create a `GraphStore` struct which exposes an interface for interacting
254    /// with a ship's Graph Store.
255    pub fn graph_store(&mut self) -> GraphStore {
256        GraphStore { channel: self }
257    }
258
259    /// Create a `Collection` struct which exposes an interface for interacting
260    /// with collections on Urbit.
261    pub fn collection(&mut self) -> Collection {
262        Collection { channel: self }
263    }
264}