1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use crate::{Connection, Event, EventData, EventType, Fallible, HttpClient, RawEvent};
use futures_core::ready;
use futures_core::stream::Stream;
use serde_json::{from_str, Error};
use std::cell::UnsafeCell;
use std::convert::TryFrom;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

impl TryFrom<RawEvent> for Event {
    type Error = Error;

    fn try_from(raw_event: RawEvent) -> Result<Self, Self::Error> {
        use EventData::*;
        let RawEvent {
            id,
            global_id,
            event_type,
            time,
            data,
        } = raw_event;
        let data = data.get();
        Ok(Event {
            id,
            global_id,
            time,
            data: match event_type {
                EventType::ConfigSaved => ConfigSaved(from_str(data)?),
                EventType::DeviceConnected => DeviceConnected(from_str(data)?),
                EventType::DeviceDisconnected => DeviceDisconnected(from_str(data)?),
                EventType::DeviceDiscovered => DeviceDiscovered(from_str(data)?),
                EventType::DevicePaused => DevicePaused(from_str(data)?),
                EventType::DeviceRejected => DeviceRejected(from_str(data)?),
                EventType::DeviceResumed => DeviceResumed(from_str(data)?),
                EventType::DownloadProgress => DownloadProgress(from_str(data)?),
                EventType::FolderCompletion => FolderCompletion(from_str(data)?),
                EventType::FolderErrors => FolderErrors(from_str(data)?),
                EventType::FolderRejected => FolderRejected(from_str(data)?),
                EventType::FolderScanProgress => FolderScanProgress(from_str(data)?),
                EventType::FolderSummary => FolderSummary(from_str(data)?),
                EventType::ItemFinished => ItemFinished(from_str(data)?),
                EventType::ItemStarted => ItemStarted(from_str(data)?),
                EventType::ListenAddressesChanged => ListenAddressesChanged(from_str(data)?),
                EventType::LocalChangeDetected => LocalChangeDetected(from_str(data)?),
                EventType::LocalIndexUpdated => LocalIndexUpdated(from_str(data)?),
                EventType::LoginAttempt => LoginAttempt(from_str(data)?),
                EventType::RemoteChangeDetected => RemoteChangeDetected(from_str(data)?),
                EventType::RemoteDownloadProgress => RemoteDownloadProgress(from_str(data)?),
                EventType::RemoteIndexUpdated => RemoteIndexUpdated(from_str(data)?),
                EventType::Starting => Starting(from_str(data)?),
                EventType::StartupComplete => StartupComplete,
                EventType::StateChanged => StateChanged(from_str(data)?),
            },
        })
    }
}

//TODO:self correction mechanism see: https://docs.syncthing.net/rest/events-get.html#events-get
pub struct EventStream<C> {
    connection: UnsafeCell<Connection<C>>,
    events: Option<UnsafeCell<Vec<EventType>>>,
    state: State,
    since: Option<u64>,
}

enum State {
    Init,
    Buffer(Vec<Event>),
    Future(Pin<Box<dyn Future<Output = Fallible<Vec<Event>>>>>),
}

impl<C> EventStream<C> {
    pub(crate) fn new(connection: Connection<C>, events: Option<Vec<EventType>>) -> Self {
        Self {
            connection: UnsafeCell::new(connection),
            events: events.map(UnsafeCell::new),
            state: State::Init,
            since: None,
        }
    }
}

impl<C: HttpClient> EventStream<C> {
    fn queue_next_state(&mut self, limit: Option<u64>) {
        let connection = unsafe { &*self.connection.get() };
        let events = self.events.as_ref().map(|e| &unsafe { &*e.get() }[..]);
        self.state = State::Future(Box::pin(connection.get_events(events, self.since, limit)));
    }
}

impl<C: HttpClient> Stream for EventStream<C> {
    type Item = Fallible<Event>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match &mut self.state {
            State::Future(fut) => match ready!(fut.as_mut().poll(cx)) {
                Ok(mut data) => {
                    data.reverse();
                    if let Some(event) = data.pop() {
                        self.since = Some(event.id);
                        self.state = State::Buffer(data);
                        Poll::Ready(Some(Ok(event)))
                    } else {
                        self.queue_next_state(None);
                        Poll::Pending
                    }
                }
                Err(err) => {
                    self.queue_next_state(None);
                    Poll::Ready(Some(Err(err)))
                }
            },
            State::Buffer(data) => {
                if let Some(event) = data.pop() {
                    self.since = Some(event.id);
                    Poll::Ready(Some(Ok(event)))
                } else {
                    self.queue_next_state(None);
                    Poll::Pending
                }
            }
            State::Init => {
                self.queue_next_state(Some(1));
                Poll::Pending
            }
        }
    }
}