1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
use crate::apps::chat::Chat;
use crate::apps::collections::Collection;
use crate::apps::notebook::Notebook;
use crate::error::{Result, UrbitAPIError};
use crate::graphstore::GraphStore;
use crate::interface::ShipInterface;
use crate::subscription::{CreationID, Subscription};
use eventsource_threaded::{EventSource, ReceiverSource};
use json::{object, JsonValue};
use rand::Rng;
use reqwest::blocking::Response;
use reqwest::header::HeaderMap;
use reqwest::Url;
use std::time::SystemTime;

/// A Channel which is used to interact with a ship
#[derive(Debug)]
pub struct Channel {
    /// `ShipInterface` this channel is created from
    pub ship_interface: ShipInterface,
    /// The uid of the channel
    pub uid: String,
    /// The url of the channel
    pub url: String,
    // The list of `Subscription`s for this channel
    pub subscription_list: Vec<Subscription>,
    // / The `EventSource` for this channel which reads all of
    // / the SSE events.
    event_receiver: ReceiverSource,
    /// The current number of messages that have been sent out (which are
    /// also defined as message ids) via this `Channel`
    pub message_id_count: u64,
}

/// Channel methods for basic functionality
impl Channel {
    /// Create a new channel
    pub fn new(ship_interface: ShipInterface) -> Result<Channel> {
        let mut rng = rand::thread_rng();
        // Defining the uid as UNIX time, or random if error
        let uid = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
            Ok(n) => n.as_micros(),
            Err(_) => rng.gen(),
        }
        .to_string();

        // Channel url
        let channel_url = format!("{}/~/channel/{}", &ship_interface.url, uid);
        // Opening channel request json
        let mut body = json::parse(r#"[]"#).unwrap();
        body[0] = object! {
                "id": 1,
                "action": "poke",
                "ship": ship_interface.ship_name.clone(),
                "app": "hood",
                "mark": "helm-hi",
                "json": "Opening channel",
        };

        // Make the put request to create the channel.
        let resp = ship_interface.send_put_request(&channel_url, &body)?;

        if resp.status().as_u16() == 204 {
            // Create cookie header with the ship session auth val
            let mut headers = HeaderMap::new();
            headers.append("cookie", ship_interface.session_auth.clone());
            // Create the receiver
            let url_structured =
                Url::parse(&channel_url).map_err(|_| UrbitAPIError::FailedToCreateNewChannel)?;
            let receiver = EventSource::new(url_structured, headers);

            return Ok(Channel {
                ship_interface: ship_interface,
                uid: uid,
                url: channel_url,
                subscription_list: vec![],
                event_receiver: receiver,
                message_id_count: 2,
            });
        } else {
            return Err(UrbitAPIError::FailedToCreateNewChannel);
        }
    }

    /// Sends a poke over the channel
    pub fn poke(&mut self, app: &str, mark: &str, json: &JsonValue) -> Result<Response> {
        let mut body = json::parse(r#"[]"#).unwrap();
        body[0] = object! {
                "id": self.get_and_raise_message_id_count(),
                "action": "poke",
                "ship": self.ship_interface.ship_name.clone(),
                "app": app,
                "mark": mark,
                "json": json.clone(),
        };

        // Make the put request for the poke
        self.ship_interface.send_put_request(&self.url, &body)
    }

    /// Sends a scry to the ship
    pub fn scry(&self, app: &str, path: &str, mark: &str) -> Result<Response> {
        self.ship_interface.scry(app, path, mark)
    }

    /// Run a thread via spider
    pub fn spider(
        &self,
        input_mark: &str,
        output_mark: &str,
        thread_name: &str,
        body: &JsonValue,
    ) -> Result<Response> {
        self.ship_interface
            .spider(input_mark, output_mark, thread_name, body)
    }

    /// Create a new `Subscription` and thus subscribes to events on the
    /// ship with the provided app/path.
    pub fn create_new_subscription(&mut self, app: &str, path: &str) -> Result<CreationID> {
        // Saves the message id to be reused
        let creation_id = self.get_and_raise_message_id_count();
        // Create the json body
        let mut body = json::parse(r#"[]"#).unwrap();
        body[0] = object! {
                "id": creation_id,
                "action": "subscribe",
                "ship": self.ship_interface.ship_name.clone(),
                "app": app.to_string(),
                "path": path.to_string(),
        };

        // Make the put request to create the channel.
        let resp = self.ship_interface.send_put_request(&self.url, &body)?;

        if resp.status().as_u16() == 204 {
            // Create the `Subscription`
            let sub = Subscription {
                channel_uid: self.uid.clone(),
                creation_id: creation_id,
                app: app.to_string(),
                path: path.to_string(),
                message_list: vec![],
            };
            // Add the `Subscription` to the list
            self.subscription_list.push(sub.clone());
            return Ok(creation_id);
        } else {
            return Err(UrbitAPIError::FailedToCreateNewSubscription);
        }
    }

    /// Parses SSE messages for this channel and moves them into
    /// the proper corresponding `Subscription`'s `message_list`.
    pub fn parse_event_messages(&mut self) {
        let rec = &mut self.event_receiver;

        // Consume all messages
        loop {
            if let Ok(event_res) = rec.try_recv() {
                if let Err(e) = &event_res {
                    println!("Error Event: {}", e);
                }
                if let Ok(event) = event_res {
                    // Go through all subscriptions and find which
                    // subscription this event is for.
                    for sub in &mut self.subscription_list {
                        // If adding the message succeeded (because found
                        // correct `Subscription`) then stop.
                        if let Some(_) = sub.add_to_message_list(&event) {
                            // Send an ack for the processed event
                            // Using unwrap because `add_to_message_list`
                            // already does error checking.
                            let eid: u64 = event.id.unwrap().parse().unwrap();
                            let mut json = json::parse(r#"[]"#).unwrap();
                            json[0] = object! {
                                "id": self.message_id_count,
                                "action": "ack",
                                "event-id": eid,
                            };
                            self.message_id_count += 1;
                            let _ack_res = self.ship_interface.send_put_request(&self.url, &json);
                            break;
                        }
                    }
                }
                continue;
            }
            break;
        }
    }

    /// Finds the first `Subscription` in the list which has a matching
    /// `app` and `path`;
    pub fn find_subscription(&mut self, app: &str, path: &str) -> Option<&mut Subscription> {
        for sub in &mut self.subscription_list {
            if sub.app == app && sub.path == path {
                return Some(sub);
            }
        }
        None
    }

    /// Finds the first `Subscription` in the list which has a matching
    /// `app` and `path`, removes it from the list, and tells the ship
    /// that you are unsubscribing. Returns `None` if failed to find
    /// a subscription with a matching app & path.
    pub fn unsubscribe(&mut self, app: &str, path: &str) -> Option<bool> {
        let index = self
            .subscription_list
            .iter()
            .position(|s| s.app == app && s.path == path)?;
        self.subscription_list.remove(index);
        Some(true)
    }

    /// Deletes the channel
    pub fn delete_channel(self) {
        let mut json = json::parse(r#"[]"#).unwrap();
        json[0] = object! {
            "id": self.message_id_count,
            "action": "delete",
        };
        let _res = self.ship_interface.send_put_request(&self.url, &json);
        std::mem::drop(self);
    }

    /// Acquires and returns the current `message_id_count` from the
    /// `ShipInterface` that this channel was created from while also
    /// increase said value by 1.
    fn get_and_raise_message_id_count(&mut self) -> u64 {
        let current_id_count = self.message_id_count;
        self.message_id_count += 1;
        current_id_count
    }
}

/// `Channel` methods which expose advanced functionality, typically by
/// producing another struct which is built on top of `Channel`.
impl Channel {
    /// Create a `Chat` struct which exposes an interface for interacting
    /// with chats on Urbit
    pub fn chat(&mut self) -> Chat {
        Chat { channel: self }
    }

    /// Create a `Notebook` struct which exposes an interface for interacting
    /// with notebooks on Urbit
    pub fn notebook(&mut self) -> Notebook {
        Notebook { channel: self }
    }

    /// Create a `GraphStore` struct which exposes an interface for interacting
    /// with a ship's Graph Store.
    pub fn graph_store(&mut self) -> GraphStore {
        GraphStore { channel: self }
    }

    /// Create a `Collection` struct which exposes an interface for interacting
    /// with collections on Urbit.
    pub fn collection(&mut self) -> Collection {
        Collection { channel: self }
    }
}