1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use futures::*;
use iced_futures::futures;
use std::sync::Arc;

// Subscriptionで登録されている関数。引数が渡せそう。
pub fn input<T: ToString>(
    client_stream: Option<&Arc<futures::lock::Mutex<irc::client::ClientStream>>>,
    some_input: T,
) -> iced::Subscription<Progress> {
    iced::Subscription::from_recipe(SubscribeIrc {
        client_stream: Some(Arc::clone(client_stream.expect("subscribe_irc unwrap client_stream"))),
        some_input: some_input.to_string(),
    })
}

// だいぶ適当な実装にしてしまった。なんかよくわからないが回っているけれども、もう少しよく考えた方がいい。
// Make sure iced can use our download stream
impl<H, I> iced_native::subscription::Recipe<H, I> for SubscribeIrc
where
    H: std::hash::Hasher,
{
    type Output = Progress;

    fn hash(&self, state: &mut H) {
        use std::hash::Hash;

        std::any::TypeId::of::<Self>().hash(state);
        self.some_input.hash(state);
    }

    // 引数にBoxStreamがあって、返り値も同様
    fn stream(
        self: Box<Self>,
        _input: futures::stream::BoxStream<'static, I>,
    ) -> futures::stream::BoxStream<'static, Self::Output> {
        Box::pin(futures::stream::unfold(
            SubscribeIrcState::Ready {
                client_stream: self.client_stream,
                some_input: self.some_input,
            },
            |state| async move {
                match state {
                    // 最初に呼ばれるところ
                    SubscribeIrcState::Ready { client_stream, .. } => {
                        let client_stream = client_stream;
                        Some((
                            Progress::Started,
                            SubscribeIrcState::Incoming {
                                client_stream,
                                message_text: String::from(""),
                            },
                        ))
                    }
                    // Streamが来ている時。回り続ける。
                    SubscribeIrcState::Incoming {
                        client_stream,
                        mut message_text,
                    } => {
                        let cloneclient = client_stream.clone();
                        match (&mut cloneclient.expect("cloneclient error").lock().await)
                            .next()
                            .await
                            .transpose()
                        {
                            Ok(Some(chunk)) => {
                                message_text = chunk.to_string();
                                Some((
                                    Progress::Advanced(message_text.clone()),
                                    SubscribeIrcState::Incoming {
                                        client_stream,
                                        message_text,
                                    },
                                ))
                            }
                            Ok(None) => Some((Progress::Finished, SubscribeIrcState::Finished)),
                            Err(_) => Some((Progress::Errored, SubscribeIrcState::Finished)),
                        }
                    }
                    SubscribeIrcState::Finished => {
                        // We do not let the stream die, as it would start a
                        // new download repeatedly if the user is not careful
                        // in case of errors.
                        let _: () = iced::futures::future::pending().await;

                        None
                    }
                }
            },
        ))
    }
}

#[derive(Debug, Clone)]
pub enum Progress {
    Started,
    Advanced(String),
    Finished,
    Errored,
}

pub enum SubscribeIrcState {
    Ready {
        client_stream: Option<Arc<futures::lock::Mutex<irc::client::ClientStream>>>,
        some_input: String,
    },
    Incoming {
        client_stream: Option<Arc<futures::lock::Mutex<irc::client::ClientStream>>>,
        message_text: String,
    },
    Finished,
}

pub struct SubscribeIrc {
    some_input: String,
    client_stream: Option<Arc<futures::lock::Mutex<irc::client::ClientStream>>>,
}