drop-stream
Do you need to know when your stream is dropped, after you pass it off?
DropStream
will send a message to a futures::channel::oneshot::Reciever
upon being dropped. You can run cleanup code and close connections when your stream's consumer drops it.
This is especially useful when using frameworks that consume data streams.
Example
struct MyDataService {
db_ref: Arc<SqlDatabase> }
impl MyDataService {
async get_change_feed(&self, user_id: String) -> Pin<Box<dyn Stream<Item = User>>> {
let session = self.db_ref.new_session(user_id).await;
let data_stream = session.changes();
let (wrapped_stream, rx) = DropStream::new(data_stream);
tokio::spawn(async move {
let _ = rx.await; log::debug!("Closing stream...");
let _close_success = session.close().await;
});
wrapped_stream.boxed()
}
}