use std::pin::Pin;
use tokio_stream::Stream;
use super::events::NetworkEvent;
use crate::netlink::protocol::Route;
use crate::netlink::resync::{ConnectionFactory, ResyncStream, events_with_resync};
use crate::netlink::stream::{EventSubscription, OwnedEventStream};
use crate::{Connection, Result};
pub async fn rtnetlink_snapshot(conn: &Connection<Route>) -> Result<Vec<NetworkEvent>> {
let mut out = Vec::new();
for link in conn.get_links().await? {
out.push(NetworkEvent::NewLink(link));
}
for addr in conn.get_addresses().await? {
out.push(NetworkEvent::NewAddress(addr));
}
for route in conn.get_routes().await? {
out.push(NetworkEvent::NewRoute(route));
}
for neigh in conn.get_neighbors().await? {
out.push(NetworkEvent::NewNeighbor(neigh));
}
Ok(out)
}
type SnapshotFuture =
Pin<Box<dyn Future<Output = Result<Vec<NetworkEvent>>> + Send + 'static>>;
type SnapshotFn = Box<dyn FnMut() -> SnapshotFuture + Send + Unpin + 'static>;
fn make_snapshot_fn(factory: ConnectionFactory<Route>) -> SnapshotFn {
Box::new(move || {
let factory = factory.clone();
Box::pin(async move {
let conn = (factory)().await?;
rtnetlink_snapshot(&conn).await
}) as SnapshotFuture
})
}
pub type OwnedResyncStream =
ResyncStream<'static, OwnedEventStream<Route>, NetworkEvent, SnapshotFn>;
pub type BorrowedResyncStream<'a> =
ResyncStream<'static, EventSubscription<'a, Route>, NetworkEvent, SnapshotFn>;
impl Connection<Route> {
#[tracing::instrument(level = "info", skip_all)]
pub async fn into_events_with_resync(
self,
factory: ConnectionFactory<Route>,
) -> Result<OwnedResyncStream> {
self.subscribe_all()?;
let stream = self.into_events().await;
Ok(events_with_resync(stream, make_snapshot_fn(factory)))
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn subscribe_all_with_resync(
&self,
factory: ConnectionFactory<Route>,
) -> Result<BorrowedResyncStream<'_>> {
self.subscribe_all()?;
let stream = self.events().await;
Ok(events_with_resync(stream, make_snapshot_fn(factory)))
}
}
#[allow(dead_code)]
fn _streams_are_streams() {
fn assert_stream<S: Stream + ?Sized>() {}
assert_stream::<OwnedResyncStream>();
assert_stream::<BorrowedResyncStream<'static>>();
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::netlink::resync::ConnectionFuture;
#[test]
fn factory_is_clone_and_send() {
let factory: ConnectionFactory<Route> = Arc::new(|| {
Box::pin(async { Connection::<Route>::new() })
as Pin<Box<dyn Future<Output = Result<Connection<Route>>> + Send + 'static>>
});
let _f2 = factory.clone();
fn assert_send_sync<T: Send + Sync>() {}
fn assert_send<T: Send>() {}
assert_send_sync::<ConnectionFactory<Route>>();
assert_send::<ConnectionFuture<Route>>();
}
#[test]
fn snapshot_fn_is_send() {
let factory: ConnectionFactory<Route> = Arc::new(|| {
Box::pin(async { Connection::<Route>::new() })
});
let _fn = make_snapshot_fn(factory);
}
}