1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
//! `Eth` namespace, subscriptions

use std::marker::PhantomData;

use crate::api::Namespace;
use crate::helpers::{self, CallFuture};
use crate::types::{BlockHeader, Filter, Log, SyncState, H256};
use crate::{DuplexTransport, Error};
use futures::{Async, Future, Poll, Stream};
use serde;
use serde_json;

/// `Eth` namespace, subscriptions
#[derive(Debug, Clone)]
pub struct EthSubscribe<T> {
    transport: T,
}

impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
    fn new(transport: T) -> Self
    where
        Self: Sized,
    {
        EthSubscribe { transport }
    }

    fn transport(&self) -> &T {
        &self.transport
    }
}

/// ID of subscription returned from `eth_subscribe`
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct SubscriptionId(String);

impl From<String> for SubscriptionId {
    fn from(s: String) -> Self {
        SubscriptionId(s)
    }
}

/// Stream of notifications from a subscription
/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
/// notifications are delivered.
#[derive(Debug)]
pub struct SubscriptionStream<T: DuplexTransport, I> {
    transport: T,
    id: SubscriptionId,
    rx: T::NotificationStream,
    _marker: PhantomData<I>,
}

impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
    fn new(transport: T, id: SubscriptionId) -> Self {
        let rx = transport.subscribe(&id);
        SubscriptionStream {
            transport,
            id,
            rx,
            _marker: PhantomData,
        }
    }

    /// Return the ID of this subscription
    pub fn id(&self) -> &SubscriptionId {
        &self.id
    }

    /// Unsubscribe from the event represented by this stream
    pub fn unsubscribe(self) -> CallFuture<bool, T::Out> {
        let &SubscriptionId(ref id) = &self.id;
        let id = helpers::serialize(&id);
        CallFuture::new(self.transport.execute("eth_unsubscribe", vec![id]))
    }
}

impl<T, I> Stream for SubscriptionStream<T, I>
where
    T: DuplexTransport,
    I: serde::de::DeserializeOwned,
{
    type Item = I;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.rx.poll() {
            Ok(Async::Ready(Some(x))) => serde_json::from_value(x).map(Async::Ready).map_err(Into::into),
            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => Err(e),
        }
    }
}

impl<T: DuplexTransport, I> Drop for SubscriptionStream<T, I> {
    fn drop(&mut self) {
        self.transport.unsubscribe(self.id());
    }
}

#[derive(Debug)]
pub struct SubscriptionResult<T: DuplexTransport, I> {
    transport: T,
    inner: CallFuture<String, T::Out>,
    _marker: PhantomData<I>,
}

impl<T: DuplexTransport, I> SubscriptionResult<T, I> {
    pub fn new(transport: T, id_future: CallFuture<String, T::Out>) -> Self {
        SubscriptionResult {
            transport,
            inner: id_future,
            _marker: PhantomData,
        }
    }
}

impl<T, I> Future for SubscriptionResult<T, I>
where
    T: DuplexTransport,
    I: serde::de::DeserializeOwned,
{
    type Item = SubscriptionStream<T, I>;
    type Error = Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.inner.poll() {
            Ok(Async::Ready(id)) => Ok(Async::Ready(SubscriptionStream::new(
                self.transport.clone(),
                SubscriptionId(id),
            ))),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => Err(e),
        }
    }
}

impl<T: DuplexTransport> EthSubscribe<T> {
    /// Create a new heads subscription
    pub fn subscribe_new_heads(&self) -> SubscriptionResult<T, BlockHeader> {
        let subscription = helpers::serialize(&&"newHeads");
        let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
        SubscriptionResult::new(self.transport().clone(), id_future)
    }

    /// Create a logs subscription
    pub fn subscribe_logs(&self, filter: Filter) -> SubscriptionResult<T, Log> {
        let subscription = helpers::serialize(&&"logs");
        let filter = helpers::serialize(&filter);
        let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription, filter]));
        SubscriptionResult::new(self.transport().clone(), id_future)
    }

    /// Create a pending transactions subscription
    pub fn subscribe_new_pending_transactions(&self) -> SubscriptionResult<T, H256> {
        let subscription = helpers::serialize(&&"newPendingTransactions");
        let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
        SubscriptionResult::new(self.transport().clone(), id_future)
    }

    /// Create a sync status subscription
    pub fn subscribe_syncing(&self) -> SubscriptionResult<T, SyncState> {
        let subscription = helpers::serialize(&&"syncing");
        let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
        SubscriptionResult::new(self.transport().clone(), id_future)
    }
}