fugle 0.3.8

A Simple, Lightweight, Fast and Safe Fugle Library
Documentation
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

use futures_util::StreamExt;
use log::error;
use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::connect_async;

use crate::schema::Result;

pub(crate) struct Async {
    pub(crate) routine: Option<tokio::task::JoinHandle<()>>,
}

impl Async {
    pub(crate) async fn new<T>(
        uri: &str,
        sender: UnboundedSender<T>,
        done: Arc<AtomicBool>,
    ) -> Result<Async>
    where
        T: for<'de> serde::Deserialize<'de> + Send + 'static,
    {
        let (mut socket, _) = connect_async(uri).await?;

        let routine = tokio::spawn(async move {
            while !done.load(Ordering::SeqCst) {
                if let Some(Ok(msg)) = socket.next().await {
                    if let Ok(m) = msg.to_text() {
                        if let Ok(m) = serde_json::from_str(m) {
                            if let Err(e) = sender.send(m) {
                                error!("{}", e);
                            }
                        }
                    }
                }
            }
            let _ = socket.close(None).await;
        });

        Ok(Async {
            routine: Some(routine),
        })
    }
}

impl super::Worker for Async {
    fn stop(&mut self) {
        if let Some(routine) = self.routine.take() {
            drop(routine)
        }
    }
}

#[cfg(test)]
mod test {
    use tokio::{
        sync::mpsc::unbounded_channel,
        time::{sleep, Duration},
    };

    use super::{
        super::{QuoteResponse, Worker, INTRADAY_QUOTE},
        *,
    };

    #[tokio::test]
    async fn test_async_worker_stop() {
        let (tx, _) = unbounded_channel::<QuoteResponse>();
        let done = Arc::new(AtomicBool::new(false));
        let mut worker = Async::new(
            &format!("{}?symbolId=2884&apiToken=demo", INTRADAY_QUOTE),
            tx,
            done.clone(),
        )
        .await
        .unwrap();

        done.store(true, Ordering::SeqCst);
        sleep(Duration::from_millis(3)).await;

        worker.stop();
    }
}