1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
//! A client connection to BigML.

use bytes::Bytes;
use futures::{prelude::*, FutureExt};
use reqwest::{self, multipart, StatusCode};
use serde::de::DeserializeOwned;
use std::env;
use std::error;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::fs;
use tokio_util::codec;
use tracing::debug;
use tracing::instrument;
use url::Url;

use crate::errors::*;
use crate::progress::ProgressOptions;
use crate::resource::{self, Id, Resource, Source, Updatable};
use crate::wait::{wait, BackoffType, WaitOptions, WaitStatus};

/// The default domain to use for making API requests to BigML.
pub static DEFAULT_BIGML_DOMAIN: &str = "bigml.io";

/// A client connection to BigML.
pub struct Client {
    url: Url,
    username: String,
    api_key: String,
}

impl Client {
    /// Create a new `Client` that will connect to `DEFAULT_BIGML_DOMAIN`.
    pub fn new<S1, S2>(username: S1, api_key: S2) -> Result<Client>
    where
        // It's unclear whether it's worthwhile to make these generic. We only
        // do it for backward-compatibility.
        S1: Into<String>,
        S2: Into<String>,
    {
        Self::new_with_domain(DEFAULT_BIGML_DOMAIN, username, api_key)
    }

    /// Create a new `Client`, specifying the BigML domain to connect to. Use
    /// this if you have a specially hosted BigML instance.
    #[instrument(level = "trace", skip(username, api_key))]
    pub fn new_with_domain<S1, S2>(
        domain: &str,
        username: S1,
        api_key: S2,
    ) -> Result<Client>
    where
        // It's unclear whether it's worthwhile to make these generic. We only
        // do it for consistency.
        S1: Into<String>,
        S2: Into<String>,
    {
        let url_str = format!("https://{}/", domain);
        let url = url_str
            .parse()
            .map_err(|err| Error::could_not_parse_url_with_domain(domain, err))?;
        Ok(Client {
            url,
            username: username.into(),
            api_key: api_key.into(),
        })
    }

    /// Create a new client, using the environment variables `BIGML_USERNAME`,
    /// `BIGML_API_KEY` and optionally `BIGML_DOMAIN` to configure it.
    pub fn new_from_env() -> Result<Client> {
        let domain = env::var("BIGML_DOMAIN")
            .unwrap_or_else(|_| DEFAULT_BIGML_DOMAIN.to_owned());
        let username = env::var("BIGML_USERNAME")
            .map_err(|_| Error::missing_env_var("BIGML_USERNAME"))?;
        let api_key = env::var("BIGML_API_KEY")
            .map_err(|_| Error::missing_env_var("BIGML_API_KEY"))?;
        Self::new_with_domain(&domain, username, api_key)
    }

    /// Format our BigML auth credentials.
    fn auth(&self) -> String {
        format!("username={}&api_key={}", self.username, self.api_key)
    }

    /// Generate an authenticated URL with the specified path.
    fn url(&self, path: &str) -> Url {
        let mut url: Url = self.url.clone();
        url.set_path(path);
        url.set_query(Some(&self.auth()));
        url
    }

    /// Create a new resource.
    #[instrument(level = "trace", skip(self, args))]
    pub async fn create<'a, Args>(&'a self, args: &'a Args) -> Result<Args::Resource>
    where
        Args: resource::Args,
    {
        let url = self.url(Args::Resource::create_path());
        debug!(
            "POST {} {:#?}",
            Args::Resource::create_path(),
            &serde_json::to_string(args)
        );
        let client = reqwest::Client::new();
        let res = client
            .post(url.clone())
            .json(args)
            .send()
            .await
            .map_err(|e| Error::could_not_access_url(&url, e))?;
        self.handle_response_and_deserialize(&url, res).await
    }

    /// Create a new resource, and wait until it is ready.
    #[instrument(level = "trace", skip(self, args))]
    pub async fn create_and_wait<'a, Args>(
        &'a self,
        args: &'a Args,
    ) -> Result<Args::Resource>
    where
        Args: resource::Args,
    {
        let resource = self.create(args).await?;
        self.wait(resource.id()).await
    }

    /// Create a BigML data source using data from the specified stream.  We
    /// stream the data over the network without trying to load it all into
    /// memory at once.
    #[deprecated = "This won't work until BigML fixes Transfer-Encoding: chunked"]
    pub async fn create_source_from_stream<S>(
        &self,
        filename: &str,
        stream: S,
    ) -> Result<Source>
    where
        S: TryStream + Send + Sync + 'static,
        S::Error: Into<Box<dyn error::Error + Send + Sync>>,
        Bytes: From<S::Ok>,
    {
        debug!("uploading {} from stream", filename);

        let data = multipart::Part::stream(reqwest::Body::wrap_stream(stream))
            .mime_str("application/octet-stream")?;
        let form = multipart::Form::new().part("file", data);

        // Post our request.
        let url = self.url("/source");
        let client = reqwest::Client::new();
        let res = client
            .post(url.clone())
            .multipart(form)
            .send()
            .await
            .map_err(|e| Error::could_not_access_url(&url, e))?;
        self.handle_response_and_deserialize(&url, res).await
    }

    /// Create a BigML data source using data from the specified path.  We
    /// stream the data over the network without trying to load it all into
    /// memory at once.
    #[allow(clippy::needless_lifetimes, deprecated)]
    #[deprecated = "This won't work until BigML fixes Transfer-Encoding: chunked"]
    pub async fn create_source_from_path(&self, path: PathBuf) -> Result<Source> {
        // Convert our path to a stream of `Bytes`.
        let file = fs::File::open(&path)
            .await
            .map_err(|err| Error::could_not_read_file(&path, err))?;
        let err_path = path.clone();
        let stream = codec::FramedRead::new(file, codec::BytesCodec::new())
            .map_ok(|bytes| bytes.freeze())
            .map_err(move |err| Error::could_not_read_file(&err_path, err));

        // Create our source.
        let filename = path.to_string_lossy();
        self.create_source_from_stream(&filename, stream).await
    }

    /// Create a BigML data source using data from the specified path.  We
    /// stream the data over the network without trying to load it all into
    /// memory.
    #[allow(clippy::needless_lifetimes, deprecated)]
    #[deprecated = "This won't work until BigML fixes Transfer-Encoding: chunked"]
    pub async fn create_source_from_path_and_wait(
        &self,
        path: PathBuf,
    ) -> Result<Source> {
        let source = self.create_source_from_path(path).await?;
        // Only wait 2 hours for a source to be created
        let options = WaitOptions::default().timeout(Duration::from_secs(2 * 60 * 60));
        let mut progress_options = ProgressOptions::default();
        self.wait_opt(source.id(), &options, &mut progress_options)
            .await
    }

    /// Update the specified `resource` using `update`. We do not return the
    /// updated resource because of peculiarities with BigML's API, but you
    /// can always use `Client::fetch` if you need the updated version.
    #[instrument(level = "trace", skip(self))]
    pub async fn update<'a, R: Resource + Updatable>(
        &'a self,
        resource: &'a Id<R>,
        update: &'a <R as Updatable>::Update,
    ) -> Result<()> {
        let url = self.url(resource.as_str());
        debug!("PUT {}: {:?}", url, update);
        let client = reqwest::Client::new();
        let res = client
            .request(reqwest::Method::PUT, url.clone())
            .json(update)
            .send()
            .await
            .map_err(|e| Error::could_not_access_url(&url, e))?;
        // Parse our result as JSON, because it often seems to be missing
        // fields like `name` for `Source`. It's not always a complete,
        // valid resource.
        let _json: serde_json::Value =
            self.handle_response_and_deserialize(&url, res).await?;

        Ok(())
    }

    /// Fetch an existing resource.
    #[instrument(level = "trace", skip(self))]
    pub async fn fetch<'a, R: Resource>(&'a self, resource: &'a Id<R>) -> Result<R> {
        let url = self.url(resource.as_str());
        let client = reqwest::Client::new();
        let res = client
            .get(url.clone())
            .send()
            .await
            .map_err(|e| Error::could_not_access_url(&url, e))?;
        self.handle_response_and_deserialize(&url, res).await
    }

    /// Poll an existing resource, returning it once it's ready.
    ///
    /// If an underlying BigML error occurs, it can be accessed using
    /// [`Error::original_bigml_error`].
    #[instrument(level = "trace", skip(self))]
    pub async fn wait<'a, R: Resource>(&'a self, resource: &'a Id<R>) -> Result<R> {
        let options = WaitOptions::default()
            .backoff_type(BackoffType::Exponential)
            .retry_interval(Duration::from_secs(10))
            .allowed_errors(6);
        let mut progress_options = ProgressOptions::default();
        self.wait_opt(resource, &options, &mut progress_options)
            .await
    }

    /// Poll an existing resource, returning it once it's ready, and honoring
    /// wait and progress options.
    ///
    /// If an underlying BigML error occurs, it can be accessed using
    /// [`Error::original_bigml_error`].
    #[instrument(level = "trace", skip(self, wait_options, progress_options))]
    pub async fn wait_opt<'a, 'b, R: Resource>(
        &self,
        resource: &'a Id<R>,
        wait_options: &'a WaitOptions,
        progress_options: &'a mut ProgressOptions<'b, R>,
    ) -> Result<R> {
        let url = self.url(resource.as_str());
        debug!("Waiting for {}", url_without_api_key(&url));

        // We actually want to pass an `aync || { ... }` to `wait`, below, but
        // async closures are going to stablize later than the rest of
        // `async_await`. So we need to use `|| { async { ... } }`, which is a
        // regular closure that returns a future. Except that this doesn't
        // _quite_ work, because the `async { ... }` would contain a mutable
        // reference `progress_options`, which can't be allowed to escape the
        // outer `|| { ... }` block. So we cheat, and wrap our mutable state in
        // a lock. When `async || { ... }` stablizes, we can just delete this
        // line.
        let progress_options = Arc::new(RwLock::new(progress_options));

        wait(wait_options, || {
            let progress_options = progress_options.clone();
            async move {
                // TODO: Consider replacing `try_with_temporary_failure!`
                // and `try_with_permanent_failure!` with `try_wait!` and
                // appropriate error wrapping.
                let res = try_with_temporary_failure!(self.fetch(resource).await);
                if let Some(ref mut callback) =
                    progress_options.write().unwrap().callback
                {
                    try_with_permanent_failure!(callback(&res));
                }
                if res.status().code().is_ready() {
                    WaitStatus::Finished(res)
                } else if res.status().code().is_err() {
                    let message = res.status().message();
                    let err = Error::WaitFailed {
                        id: resource.to_string(),
                        message: message.to_owned(),
                    };
                    // In general, we want to fail for good here, because even
                    // if this error could be fixed, it's going to have to be
                    // fixed at a higher level than this call to `wait_opt`.
                    // Most likely, the underlying BigML resource will need to
                    // be recreated from scratch and waited on again.
                    //
                    // DO NOT USE `Error::might_be_temporary` here, because we
                    // know that `Error::WaitFailed` represents an error that
                    // won't get fixed by waiting more.
                    WaitStatus::FailedPermanently(err)
                } else {
                    WaitStatus::Waiting
                }
            }
            .boxed()
        })
        .await
        .map_err(|e| Error::could_not_access_url(&url, e))
    }

    /// Download a resource as a CSV file.  This only makes sense for
    /// certain kinds of resources.
    pub async fn download<'a, R: Resource>(
        &'a self,
        resource: &'a Id<R>,
    ) -> Result<reqwest::Response> {
        // This timeout needs to be set fairly high, because when we first try
        // to download a dataset, even one which has been `wait`ed on, we get
        // back a JSON message informing us that the dataset isn't ready for
        // download yet. We've definitely seen this process take longer than 3
        // minutes, so let's try this.
        let options = WaitOptions::default().timeout(Duration::from_secs(10 * 60));
        self.download_opt(resource, &options).await
    }

    /// Download a resource as a CSV file.  This only makes sense for
    /// certain kinds of resources.
    #[instrument(level = "trace", skip(self))]
    pub async fn download_opt<'a, R: Resource>(
        &'a self,
        resource: &'a Id<R>,
        options: &'a WaitOptions,
    ) -> Result<reqwest::Response> {
        let url = self.url(&format!("{}/download", &resource));
        debug!("Downloading {}", url_without_api_key(&url));
        let client = reqwest::Client::new();
        wait(
            options,
            || -> Pin<Box<dyn Future<Output = WaitStatus<_, Error>> + Send>> {
                async {
                    // TODO: Consider replacing `try_with_temporary_failure!`
                    // and `try_with_permanent_failure!` with `try_wait!` and
                    // appropriate error wrapping.
                    let res = try_with_temporary_failure!(
                        client.get(url.clone()).send().await
                    );
                    if res.status().is_success() {
                        // Sometimes "/download" returns JSON instead of CSV, which
                        // is generally a sign that we need to wait.
                        let headers = res.headers().to_owned();
                        if let Some(ct) = headers.get("Content-Type") {
                            if ct.as_bytes().starts_with(b"application/json") {
                                let body =
                                    try_with_temporary_failure!(res.text().await);
                                debug!("Got JSON when downloading CSV: {}", body);
                                return WaitStatus::Waiting;
                            }
                        }
                        WaitStatus::Finished(res)
                    } else {
                        try_with_temporary_failure!(
                            self.response_to_err(&url, res).await
                        );
                        // The above always returns `Err` and bails out, so we can't get
                        // here.
                        unreachable!()
                    }
                }
                .boxed()
            },
        )
        .await
        .map_err(|e| Error::could_not_access_url(&url, e))
    }

    /// Delete the specified resource.
    #[instrument(level = "trace", skip(self))]
    pub async fn delete<'a, R: Resource>(&'a self, resource: &'a Id<R>) -> Result<()> {
        let url = self.url(resource.as_str());
        let client = reqwest::Client::new();
        let res = client
            .request(reqwest::Method::DELETE, url.clone())
            .send()
            .await
            .map_err(|e| Error::could_not_access_url(&url, e))?;
        if res.status().is_success() {
            debug!("Deleted {}", &resource);
            Ok(())
        } else {
            self.response_to_err(&url, res).await
        }
    }

    /// Handle a response from the server, deserializing it as the
    /// appropriate type.
    #[instrument(level = "trace", skip(self, url, res))]
    async fn handle_response_and_deserialize<'a, T>(
        &'a self,
        url: &'a Url,
        res: reqwest::Response,
    ) -> Result<T>
    where
        T: DeserializeOwned,
    {
        if res.status().is_success() {
            let body = res
                .text()
                .await
                .map_err(|e| Error::could_not_access_url(url, e))?;
            debug!("Success body: {}", &body);
            let properties = serde_json::from_str(&body)
                .map_err(|e| Error::could_not_access_url(url, e))?;
            Ok(properties)
        } else {
            self.response_to_err(url, res).await
        }
    }

    async fn response_to_err<'a, T>(
        &'a self,
        url: &'a Url,
        res: reqwest::Response,
    ) -> Result<T> {
        let url = url.to_owned();
        let status: StatusCode = res.status().to_owned();
        let body = res.text().await?;
        debug!("Error status: {} body: {}", status, body);
        match status {
            StatusCode::PAYMENT_REQUIRED => Err(Error::PaymentRequired { url, body }),
            _ => Err(Error::UnexpectedHttpStatus { url, status, body }),
        }
    }
}

#[test]
fn client_url_is_sanitizable() {
    let client = Client::new("example", "secret").unwrap();
    let err_msg = Error::Other {
        source: "Details".into(),
    };
    let err = Error::could_not_access_url(&client.url("/test"), err_msg);
    let err_str = format!("{}", err);
    println!("err_str = {:?}", err_str);
    assert!(!err_str.contains("secret"));
}