1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
/* Copyright (C) 2020-2021 Kunal Mehta <legoktm@member.fsf.org> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. */ //! # eventstreams //! //! The `eventstreams` crate provides a convenient, typed, wrapper around //! Wikimedia's [EventStreams](https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams) //! live recent changes feed. //! //! ```no_run //! # async fn doc() { //! use eventstreams::{Event,StreamExt}; //! //! let stream = eventstreams::stream(); //! eventstreams::pin_mut!(stream); //! while let Some(event) = stream.next().await { //! match event { //! Event::Edit(edit) => { //! println!( //! "{}: {} edited {}", //! &edit.server_name, &edit.user, &edit.title //! ); //! } //! Event::Log(log) => { //! println!( //! "{}: {} performed {}/{} on {}", //! &log.server_name, //! &log.user, //! &log.log_type, //! &log.log_action, //! &log.title //! ); //! } //! } //! } //! # } //! ``` mod types; use async_stream::stream; pub use futures::{Stream, StreamExt}; pub use futures_util::pin_mut; use serde_json::Value; use surf_sse::{Event as SSEEvent, EventSource}; pub use types::{EditEvent, Event, LogEvent}; fn handle_event(event: SSEEvent) -> Option<Event> { if event.data.is_empty() { return None; } let value: Value = match serde_json::from_str(&event.data) { Ok(value) => value, Err(_) => return None, }; if value["type"] == "log" { Some(Event::Log(serde_json::from_value(value).unwrap())) } else if value["type"] == "edit" { Some(Event::Edit(serde_json::from_value(value).unwrap())) } else { None } } pub fn stream() -> impl Stream<Item = Event> { let source = EventSource::new( "https://stream.wikimedia.org/v2/stream/recentchange" .parse() .unwrap(), ); stream! { for await event in source { if let Some(event) = handle_event(event.unwrap()) { yield event; } } } }