ezomyte 0.0.2

Path of Exile API client library
Documentation
//! Module for the `Stashes` accessor object.

use std::borrow::Cow;

use futures::{future, Future as StdFuture, stream, Stream as StdStream};
use hyper::client::Connect;

use ::{Client, Stash};
use super::{Batched, Stream};


/// Interface for accessing the public stashes.
#[derive(Clone, Debug)]
pub struct Stashes<C>
    where C: Clone + Connect
{
    client: Client<C>,
}

impl<C: Clone + Connect> Stashes<C> {
    #[inline]
    pub(crate) fn new(client: Client<C>) -> Self {
        Stashes { client }
    }
}

impl<C: Clone + Connect> Stashes<C> {
    /// Returns a stream of all `Stash` objects from the beginning of time.
    #[inline]
    pub fn all(&self) -> Stream<Batched<Stash>> {
        self.get_stashes_stream(None)
    }

    /// Returns a stream of `Stash` objects beginning at given `change_id`.
    #[inline]
    pub fn since<Cid>(&self, change_id: Cid) -> Stream<Batched<Stash>>
        where Cid: Into<String>
    {
        self.get_stashes_stream(Some(change_id.into()))
    }
    // TODO: also, a way to obtain the "current" next-change-id would be nice
    // (or even a method like newest() to automatically start from there)

    fn get_stashes_stream(&self, change_id: Option<String>) -> Stream<Batched<Stash>> {
        /// Enum for managing the state machine of the resulting Stream.
        enum State {
            Start{change_id: Option<String>},
            Next{change_id: String},
            End,
        }

        // Repeatedly query the public stash tabs endpoint
        // and yield `Stash` items as they come using Stream::unfold.
        let this = self.clone();
        Box::new(
            stream::unfold(State::Start{change_id}, move |state| {
                let change_id = match state {
                    State::Start{change_id} => change_id,
                    State::Next{change_id} => Some(change_id),
                    // We handle stream termination via State::End
                    // so that the last page of results is correctly included.
                    State::End => return None,
                };
                let url: Cow<str> = match change_id.as_ref() {
                    Some(cid) => format!("{}?id={}", STASHES_URL, cid).into(),
                    None => STASHES_URL.into(),
                };
                // TODO: what happens when an invalid change_id is passed?
                // we should handle that as a separate error type here
                // (which may wrap the crate-level Error)
                Some(this.client.get(url).and_then(move |resp: PublicStashTabsResponse| {
                    let stashes = {
                        // Wrap the returned stashes in `Batched` type
                        // to include the current & next change_id.
                        let curr_cid = change_id.clone();
                        let next_cid = resp.next_change_id.clone();
                        resp.stashes.into_iter().map(move |entry| Batched{
                            entry,
                            // TODO: these clones are probably unnecessary
                            // if we returned Batched<Stash, &str>
                            curr_token: curr_cid.clone(),
                            next_token: next_cid.clone(),
                        })
                    };
                    let next_state = match resp.next_change_id {
                        Some(next_cid) => {
                            // If we got the same change_id, we've reached the end.
                            let same_cid = change_id.as_ref().map(|cid| cid == &next_cid)
                                .unwrap_or(false);
                            if same_cid { State::End } else {
                                State::Next{change_id: next_cid}
                            }
                        }
                        None => {
                            // According to API docs, this is not supposed to happen.
                            warn!("No next_change_id found in stash tabs' response");
                            State::End
                        }
                    };
                    future::ok((stream::iter_ok(stashes), next_state))
                }))
            })
            .flatten()
        )
    }
}

const STASHES_URL: &str = "/public-stash-tabs";


/// Response from the /public-stash-tabs API endpoint.
#[derive(Debug, Deserialize)]
struct PublicStashTabsResponse {
    next_change_id: Option<String>,
    stashes: Vec<Stash>,
}