urbit_http_api/
channel.rs1use crate::apps::chat::Chat;
2use crate::apps::collections::Collection;
3use crate::apps::notebook::Notebook;
4use crate::error::{Result, UrbitAPIError};
5use crate::graphstore::GraphStore;
6use crate::interface::ShipInterface;
7use crate::subscription::{CreationID, Subscription};
8use eventsource_threaded::{EventSource, ReceiverSource};
9use json::{object, JsonValue};
10use rand::Rng;
11use reqwest::blocking::Response;
12use reqwest::header::HeaderMap;
13use reqwest::Url;
14use std::time::SystemTime;
15
16#[derive(Debug)]
18pub struct Channel {
19 pub ship_interface: ShipInterface,
21 pub uid: String,
23 pub url: String,
25 pub subscription_list: Vec<Subscription>,
27 event_receiver: ReceiverSource,
30 pub message_id_count: u64,
33}
34
35impl Channel {
37 pub fn new(ship_interface: ShipInterface) -> Result<Channel> {
39 let mut rng = rand::thread_rng();
40 let uid = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
42 Ok(n) => n.as_micros(),
43 Err(_) => rng.gen(),
44 }
45 .to_string();
46
47 let channel_url = format!("{}/~/channel/{}", &ship_interface.url, uid);
49 let mut body = json::parse(r#"[]"#).unwrap();
51 body[0] = object! {
52 "id": 1,
53 "action": "poke",
54 "ship": ship_interface.ship_name.clone(),
55 "app": "hood",
56 "mark": "helm-hi",
57 "json": "Opening channel",
58 };
59
60 let resp = ship_interface.send_put_request(&channel_url, &body)?;
62
63 if resp.status().as_u16() == 204 {
64 let mut headers = HeaderMap::new();
66 headers.append("cookie", ship_interface.session_auth.clone());
67 let url_structured =
69 Url::parse(&channel_url).map_err(|_| UrbitAPIError::FailedToCreateNewChannel)?;
70 let receiver = EventSource::new(url_structured, headers);
71
72 return Ok(Channel {
73 ship_interface: ship_interface,
74 uid: uid,
75 url: channel_url,
76 subscription_list: vec![],
77 event_receiver: receiver,
78 message_id_count: 2,
79 });
80 } else {
81 return Err(UrbitAPIError::FailedToCreateNewChannel);
82 }
83 }
84
85 pub fn poke(&mut self, app: &str, mark: &str, json: &JsonValue) -> Result<Response> {
87 let mut body = json::parse(r#"[]"#).unwrap();
88 body[0] = object! {
89 "id": self.get_and_raise_message_id_count(),
90 "action": "poke",
91 "ship": self.ship_interface.ship_name.clone(),
92 "app": app,
93 "mark": mark,
94 "json": json.clone(),
95 };
96
97 self.ship_interface.send_put_request(&self.url, &body)
99 }
100
101 pub fn scry(&self, app: &str, path: &str, mark: &str) -> Result<Response> {
103 self.ship_interface.scry(app, path, mark)
104 }
105
106 pub fn spider(
108 &self,
109 input_mark: &str,
110 output_mark: &str,
111 thread_name: &str,
112 body: &JsonValue,
113 ) -> Result<Response> {
114 self.ship_interface
115 .spider(input_mark, output_mark, thread_name, body)
116 }
117
118 pub fn create_new_subscription(&mut self, app: &str, path: &str) -> Result<CreationID> {
121 let creation_id = self.get_and_raise_message_id_count();
123 let mut body = json::parse(r#"[]"#).unwrap();
125 body[0] = object! {
126 "id": creation_id,
127 "action": "subscribe",
128 "ship": self.ship_interface.ship_name.clone(),
129 "app": app.to_string(),
130 "path": path.to_string(),
131 };
132
133 let resp = self.ship_interface.send_put_request(&self.url, &body)?;
135
136 if resp.status().as_u16() == 204 {
137 let sub = Subscription {
139 channel_uid: self.uid.clone(),
140 creation_id: creation_id,
141 app: app.to_string(),
142 path: path.to_string(),
143 message_list: vec![],
144 };
145 self.subscription_list.push(sub.clone());
147 return Ok(creation_id);
148 } else {
149 return Err(UrbitAPIError::FailedToCreateNewSubscription);
150 }
151 }
152
153 pub fn parse_event_messages(&mut self) {
156 let rec = &mut self.event_receiver;
157
158 loop {
160 if let Ok(event_res) = rec.try_recv() {
161 if let Err(e) = &event_res {
162 println!("Error Event: {}", e);
163 }
164 if let Ok(event) = event_res {
165 for sub in &mut self.subscription_list {
168 if let Some(_) = sub.add_to_message_list(&event) {
171 let eid: u64 = event.id.unwrap().parse().unwrap();
175 let mut json = json::parse(r#"[]"#).unwrap();
176 json[0] = object! {
177 "id": self.message_id_count,
178 "action": "ack",
179 "event-id": eid,
180 };
181 self.message_id_count += 1;
182 let _ack_res = self.ship_interface.send_put_request(&self.url, &json);
183 break;
184 }
185 }
186 }
187 continue;
188 }
189 break;
190 }
191 }
192
193 pub fn find_subscription(&mut self, app: &str, path: &str) -> Option<&mut Subscription> {
196 for sub in &mut self.subscription_list {
197 if sub.app == app && sub.path == path {
198 return Some(sub);
199 }
200 }
201 None
202 }
203
204 pub fn unsubscribe(&mut self, app: &str, path: &str) -> Option<bool> {
209 let index = self
210 .subscription_list
211 .iter()
212 .position(|s| s.app == app && s.path == path)?;
213 self.subscription_list.remove(index);
214 Some(true)
215 }
216
217 pub fn delete_channel(self) {
219 let mut json = json::parse(r#"[]"#).unwrap();
220 json[0] = object! {
221 "id": self.message_id_count,
222 "action": "delete",
223 };
224 let _res = self.ship_interface.send_put_request(&self.url, &json);
225 std::mem::drop(self);
226 }
227
228 fn get_and_raise_message_id_count(&mut self) -> u64 {
232 let current_id_count = self.message_id_count;
233 self.message_id_count += 1;
234 current_id_count
235 }
236}
237
238impl Channel {
241 pub fn chat(&mut self) -> Chat {
244 Chat { channel: self }
245 }
246
247 pub fn notebook(&mut self) -> Notebook {
250 Notebook { channel: self }
251 }
252
253 pub fn graph_store(&mut self) -> GraphStore {
256 GraphStore { channel: self }
257 }
258
259 pub fn collection(&mut self) -> Collection {
262 Collection { channel: self }
263 }
264}