maelstrom_github/
queue.rs

1//! This module contains a message queue that is backed by GitHub artifacts. It allows for
2//! communication between jobs within the same workflow run.
3
4use crate::{two_hours_from_now, Artifact, BackendIds, GitHubClient};
5use anyhow::{anyhow, Result};
6use azure_core::Etag;
7use azure_storage_blobs::prelude::BlobClient;
8use futures::stream::StreamExt as _;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashSet, VecDeque};
11use std::future::Future;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15#[allow(async_fn_in_trait)]
16pub trait QueueConnection {
17    type Blob: QueueBlob;
18
19    async fn get_blob(&self, backend_ids: BackendIds, key: &str) -> Result<Self::Blob>;
20    async fn create_blob(&self, key: &str) -> Result<Self::Blob>;
21    async fn list(&self) -> Result<Vec<Artifact>>;
22}
23
24pub enum ReadResponse {
25    Data { data: Vec<u8>, etag: Etag },
26    NoData,
27    AuthenticationFailed,
28}
29
30#[allow(async_fn_in_trait)]
31pub trait QueueBlob: Send + Sync + 'static {
32    async fn read(&self, index: usize, etag: &Option<Etag>) -> Result<ReadResponse>;
33    fn write(&self, data: Vec<u8>) -> impl Future<Output = Result<()>> + Send;
34}
35
36impl QueueConnection for GitHubClient {
37    type Blob = BlobClient;
38
39    async fn get_blob(&self, backend_ids: BackendIds, key: &str) -> Result<Self::Blob> {
40        self.start_download(backend_ids, key).await
41    }
42
43    async fn create_blob(&self, key: &str) -> Result<Self::Blob> {
44        let blob = self.start_upload(key, Some(two_hours_from_now())).await?;
45        blob.put_append_blob().await?;
46        self.finish_upload(key, 0).await?;
47        Ok(blob)
48    }
49
50    async fn list(&self) -> Result<Vec<Artifact>> {
51        GitHubClient::list(self).await
52    }
53}
54
55impl QueueBlob for BlobClient {
56    async fn read(&self, index: usize, etag: &Option<Etag>) -> Result<ReadResponse> {
57        let mut builder = self.get().range(index..);
58
59        if let Some(etag) = etag {
60            builder = builder.if_match(azure_core::request_options::IfMatchCondition::NotMatch(
61                etag.to_string(),
62            ));
63        }
64
65        let mut stream = builder.into_stream();
66        let resp = stream
67            .next()
68            .await
69            .ok_or_else(|| anyhow!("missing read response"))?;
70        match resp {
71            Ok(resp) => {
72                let msg = resp.data.collect().await?;
73                Ok(ReadResponse::Data {
74                    data: msg.to_vec(),
75                    etag: resp.blob.properties.etag,
76                })
77            }
78            Err(err) => {
79                use azure_core::{error::ErrorKind, StatusCode};
80
81                match err.kind() {
82                    ErrorKind::HttpResponse {
83                        status: StatusCode::NotModified,
84                        error_code: Some(error_code),
85                    } if error_code == "ConditionNotMet" => {
86                        return Ok(ReadResponse::NoData);
87                    }
88                    ErrorKind::HttpResponse {
89                        status: StatusCode::RequestedRangeNotSatisfiable,
90                        error_code: Some(error_code),
91                    } if error_code == "InvalidRange" => {
92                        return Ok(ReadResponse::NoData);
93                    }
94                    ErrorKind::HttpResponse {
95                        status: StatusCode::Forbidden,
96                        error_code: Some(error_code),
97                    } if error_code == "AuthenticationFailed" => {
98                        return Ok(ReadResponse::AuthenticationFailed);
99                    }
100                    _ => {}
101                }
102                Err(err.into())
103            }
104        }
105    }
106
107    async fn write(&self, to_send: Vec<u8>) -> Result<()> {
108        self.append_block(to_send).await?;
109        Ok(())
110    }
111}
112
113#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug)]
114enum MessageHeader {
115    KeepAlive,
116    Payload { size: usize },
117    Shutdown,
118}
119
120const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
121
122pub struct GitHubReadQueue<ConnT: QueueConnection = GitHubClient> {
123    conn: Arc<ConnT>,
124    blob: ConnT::Blob,
125    index: usize,
126    etag: Option<Etag>,
127    pending: VecDeque<Option<Vec<u8>>>,
128    read_timeout: Duration,
129    backend_ids: BackendIds,
130    key: String,
131}
132
133impl<ConnT> GitHubReadQueue<ConnT>
134where
135    ConnT: QueueConnection,
136{
137    async fn new(
138        conn: Arc<ConnT>,
139        read_timeout: Duration,
140        backend_ids: BackendIds,
141        key: &str,
142    ) -> Result<Self> {
143        let blob = conn.get_blob(backend_ids.clone(), key).await?;
144        Ok(Self {
145            conn,
146            blob,
147            index: 0,
148            etag: None,
149            pending: Default::default(),
150            read_timeout,
151            backend_ids,
152            key: key.into(),
153        })
154    }
155
156    async fn maybe_read_msg(&mut self) -> Result<Option<Vec<u8>>> {
157        let (msg, etag) = match self.blob.read(self.index, &self.etag).await? {
158            ReadResponse::Data { data, etag } => (data, etag),
159            ReadResponse::NoData => return Ok(None),
160            ReadResponse::AuthenticationFailed => {
161                self.blob = self
162                    .conn
163                    .get_blob(self.backend_ids.clone(), &self.key)
164                    .await?;
165                return Ok(None);
166            }
167        };
168
169        self.etag = Some(etag);
170        self.index += msg.len();
171        Ok(Some(msg))
172    }
173
174    pub async fn read_msg(&mut self) -> Result<Option<Vec<u8>>> {
175        if let Some(msg) = self.pending.pop_front() {
176            return Ok(msg);
177        }
178
179        let mut read_start = Instant::now();
180        loop {
181            if let Some(res) = self.maybe_read_msg().await? {
182                let mut r = &res[..];
183                while !r.is_empty() {
184                    let header: MessageHeader = bincode::deserialize_from(&mut r)?;
185                    match header {
186                        MessageHeader::KeepAlive => {
187                            read_start = Instant::now();
188                        }
189                        MessageHeader::Payload { size } => {
190                            let payload = r[..size].to_vec();
191                            r = &r[size..];
192                            self.pending.push_back(Some(payload));
193                        }
194                        MessageHeader::Shutdown => {
195                            self.pending.push_back(None);
196                        }
197                    }
198                }
199            }
200
201            if let Some(msg) = self.pending.pop_front() {
202                return Ok(msg);
203            }
204
205            if read_start.elapsed() > self.read_timeout {
206                return Err(anyhow!("GitHub queue read timeout"));
207            }
208        }
209    }
210}
211
212async fn send_keep_alive(duration: Duration, blob: Arc<impl QueueBlob>) {
213    loop {
214        tokio::time::sleep(duration).await;
215        let _ = blob
216            .write(bincode::serialize(&MessageHeader::KeepAlive).unwrap())
217            .await;
218    }
219}
220
221pub struct GitHubWriteQueue<BlobT = BlobClient> {
222    blob: Arc<BlobT>,
223    keep_alive: tokio::task::AbortHandle,
224    keep_alive_duration: Duration,
225}
226
227impl<BlobT: QueueBlob> GitHubWriteQueue<BlobT> {
228    async fn new<ConnT>(conn: &ConnT, keep_alive_duration: Duration, key: &str) -> Result<Self>
229    where
230        ConnT: QueueConnection<Blob = BlobT>,
231    {
232        let blob = Arc::new(conn.create_blob(key).await?);
233        let keep_alive =
234            tokio::task::spawn(send_keep_alive(keep_alive_duration, blob.clone())).abort_handle();
235        Ok(Self {
236            blob,
237            keep_alive,
238            keep_alive_duration,
239        })
240    }
241
242    pub async fn write_msg(&mut self, data: &[u8]) -> Result<()> {
243        let mut to_send = bincode::serialize(&MessageHeader::Payload { size: data.len() }).unwrap();
244        to_send.extend(data);
245        self.blob.write(to_send).await?;
246
247        self.keep_alive.abort();
248        self.keep_alive =
249            tokio::task::spawn(send_keep_alive(self.keep_alive_duration, self.blob.clone()))
250                .abort_handle();
251
252        Ok(())
253    }
254
255    pub async fn write_many_msgs(&mut self, messages: &[Vec<u8>]) -> Result<()> {
256        let mut to_send = vec![];
257        for data in messages {
258            to_send
259                .extend(bincode::serialize(&MessageHeader::Payload { size: data.len() }).unwrap());
260            to_send.extend(data);
261        }
262        self.blob.write(to_send).await?;
263
264        self.keep_alive.abort();
265        self.keep_alive =
266            tokio::task::spawn(send_keep_alive(self.keep_alive_duration, self.blob.clone()))
267                .abort_handle();
268
269        Ok(())
270    }
271
272    pub async fn shut_down(&mut self) -> Result<()> {
273        self.keep_alive.abort();
274        self.blob
275            .write(bincode::serialize(&MessageHeader::Shutdown).unwrap())
276            .await?;
277        Ok(())
278    }
279}
280
281impl<BlobT> Drop for GitHubWriteQueue<BlobT> {
282    fn drop(&mut self) {
283        self.keep_alive.abort();
284    }
285}
286
287async fn wait_for_artifact(conn: &impl QueueConnection, key: &str) -> Result<()> {
288    while !conn.list().await?.iter().any(|a| a.name == key) {}
289    Ok(())
290}
291
292pub struct GitHubQueue<ConnT: QueueConnection = GitHubClient> {
293    read: GitHubReadQueue<ConnT>,
294    write: GitHubWriteQueue<ConnT::Blob>,
295}
296
297impl<ConnT> GitHubQueue<ConnT>
298where
299    ConnT: QueueConnection,
300{
301    async fn new(
302        conn: Arc<ConnT>,
303        read_timeout: Duration,
304        read_backend_ids: BackendIds,
305        read_key: &str,
306        write_key: &str,
307    ) -> Result<Self> {
308        Ok(Self {
309            write: GitHubWriteQueue::new(&*conn, read_timeout / 4, write_key).await?,
310            read: GitHubReadQueue::new(conn, read_timeout, read_backend_ids, read_key).await?,
311        })
312    }
313
314    async fn maybe_connect(conn: Arc<ConnT>, id: &str) -> Result<Option<Self>> {
315        let artifacts = conn.list().await?;
316        if let Some(listener) = artifacts.iter().find(|a| a.name == format!("{id}-listen")) {
317            let Artifact {
318                name, backend_ids, ..
319            } = listener;
320            let key = name.strip_suffix("-listen").unwrap();
321            let self_id = uuid::Uuid::new_v4().to_string();
322
323            let write_key = format!("{self_id}-{key}-up");
324            let write = GitHubWriteQueue::new(&*conn, DEFAULT_READ_TIMEOUT / 4, &write_key).await?;
325
326            let read_key = format!("{self_id}-{key}-down");
327            wait_for_artifact(&*conn, &read_key).await?;
328            let read =
329                GitHubReadQueue::new(conn, DEFAULT_READ_TIMEOUT, backend_ids.clone(), &read_key)
330                    .await?;
331
332            Ok(Some(Self { write, read }))
333        } else {
334            Ok(None)
335        }
336    }
337
338    pub async fn connect(conn: Arc<ConnT>, id: &str) -> Result<Self> {
339        loop {
340            if let Some(socket) = Self::maybe_connect(conn.clone(), id).await? {
341                return Ok(socket);
342            }
343        }
344    }
345
346    pub async fn read_msg(&mut self) -> Result<Option<Vec<u8>>> {
347        self.read.read_msg().await
348    }
349
350    pub async fn write_msg(&mut self, data: &[u8]) -> Result<()> {
351        self.write.write_msg(data).await
352    }
353
354    pub async fn shut_down(&mut self) -> Result<()> {
355        self.write.shut_down().await
356    }
357
358    pub fn into_split(self) -> (GitHubReadQueue<ConnT>, GitHubWriteQueue<ConnT::Blob>) {
359        (self.read, self.write)
360    }
361}
362
363pub struct GitHubQueueAcceptor<ConnT = GitHubClient> {
364    id: String,
365    accepted: HashSet<String>,
366    conn: Arc<ConnT>,
367}
368
369impl<ConnT> GitHubQueueAcceptor<ConnT>
370where
371    ConnT: QueueConnection,
372{
373    pub async fn new(conn: Arc<ConnT>, id: &str) -> Result<Self> {
374        let key = format!("{id}-listen");
375        conn.create_blob(&key).await?;
376        Ok(Self {
377            id: id.into(),
378            accepted: HashSet::new(),
379            conn,
380        })
381    }
382
383    async fn maybe_accept_one(&mut self) -> Result<Option<GitHubQueue<ConnT>>> {
384        let artifacts = self.conn.list().await?;
385        if let Some(connected) = artifacts.iter().find(|a| {
386            a.name.ends_with(&format!("{}-up", self.id)) && !self.accepted.contains(&a.name)
387        }) {
388            let Artifact {
389                name, backend_ids, ..
390            } = connected;
391            let key = name.strip_suffix("-up").unwrap();
392            let socket = GitHubQueue::new(
393                self.conn.clone(),
394                DEFAULT_READ_TIMEOUT,
395                backend_ids.clone(),
396                &format!("{key}-up"),
397                &format!("{key}-down"),
398            )
399            .await?;
400            self.accepted.insert(name.into());
401            Ok(Some(socket))
402        } else {
403            Ok(None)
404        }
405    }
406
407    pub async fn accept_one(&mut self) -> Result<GitHubQueue<ConnT>> {
408        loop {
409            if let Some(socket) = self.maybe_accept_one().await? {
410                return Ok(socket);
411            }
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use anyhow::bail;
420    use std::collections::HashMap;
421    use std::sync::Mutex;
422
423    #[derive(Default)]
424    struct FakeConnection {
425        blobs: Mutex<HashMap<String, FakeBlob>>,
426    }
427
428    #[derive(Clone, Default)]
429    struct FakeBlob {
430        data: Arc<Mutex<Vec<u8>>>,
431    }
432
433    impl FakeBlob {
434        fn len(&self) -> usize {
435            self.data.lock().unwrap().len()
436        }
437
438        fn data(&self) -> Vec<u8> {
439            self.data.lock().unwrap().clone()
440        }
441    }
442
443    fn b_ids() -> BackendIds {
444        BackendIds {
445            workflow_run_backend_id: "b1".into(),
446            workflow_job_run_backend_id: "b2".into(),
447        }
448    }
449
450    impl QueueConnection for FakeConnection {
451        type Blob = FakeBlob;
452
453        async fn get_blob(&self, backend_ids: BackendIds, key: &str) -> Result<Self::Blob> {
454            tokio::task::yield_now().await;
455
456            assert_eq!(backend_ids, b_ids());
457            Ok(self
458                .blobs
459                .lock()
460                .unwrap()
461                .get(key)
462                .ok_or_else(|| anyhow!("blob not found"))?
463                .clone())
464        }
465
466        async fn create_blob(&self, key: &str) -> Result<Self::Blob> {
467            tokio::task::yield_now().await;
468
469            let mut blobs = self.blobs.lock().unwrap();
470
471            if blobs.contains_key(key) {
472                bail!("blob already exists");
473            }
474            let new_blob = FakeBlob::default();
475            blobs.insert(key.into(), new_blob.clone());
476            Ok(new_blob)
477        }
478
479        async fn list(&self) -> Result<Vec<Artifact>> {
480            tokio::task::yield_now().await;
481
482            Ok(self
483                .blobs
484                .lock()
485                .unwrap()
486                .iter()
487                .map(|(name, blob)| Artifact {
488                    name: name.clone(),
489                    backend_ids: b_ids(),
490                    size: blob.len().try_into().unwrap(),
491                    database_id: 1.into(),
492                })
493                .collect())
494        }
495    }
496
497    impl QueueBlob for FakeBlob {
498        async fn read(&self, index: usize, etag: &Option<Etag>) -> Result<ReadResponse> {
499            use sha2::Digest as _;
500
501            tokio::task::yield_now().await;
502
503            let data = self.data.lock().unwrap();
504
505            let mut hasher = sha2::Sha256::new();
506            hasher.update(&data[..]);
507            let actual_etag: Etag = maelstrom_base::Sha256Digest::new(hasher.finalize().into())
508                .to_string()
509                .into();
510
511            if let Some(not_etag) = etag {
512                if not_etag == &actual_etag {
513                    return Ok(ReadResponse::NoData);
514                }
515            }
516
517            if !data.is_empty() {
518                assert!(index < data.len());
519            }
520            Ok(ReadResponse::Data {
521                data: data[index..].to_vec(),
522                etag: actual_etag,
523            })
524        }
525
526        async fn write(&self, data: Vec<u8>) -> Result<()> {
527            tokio::task::yield_now().await;
528
529            self.data.lock().unwrap().extend(data);
530            Ok(())
531        }
532    }
533
534    const SHORT_DURATION: Duration = Duration::from_millis(100);
535    const FOREVER: Duration = Duration::from_secs(u64::MAX);
536
537    #[tokio::test]
538    async fn read_single_msg() {
539        let conn = FakeConnection::default();
540        let b = conn.create_blob("foo").await.unwrap();
541        let mut queue = GitHubReadQueue::new(Arc::new(conn), SHORT_DURATION, b_ids(), "foo")
542            .await
543            .unwrap();
544
545        b.write(bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap())
546            .await
547            .unwrap();
548        let sent_msg = vec![1, 2, 3, 4, 5];
549        b.write(sent_msg.clone()).await.unwrap();
550
551        let read_msg = queue.read_msg().await.unwrap().unwrap();
552        assert_eq!(read_msg, sent_msg);
553    }
554
555    #[tokio::test]
556    async fn read_multiple_msgs() {
557        let conn = FakeConnection::default();
558        let b = conn.create_blob("foo").await.unwrap();
559        let mut queue = GitHubReadQueue::new(Arc::new(conn), SHORT_DURATION, b_ids(), "foo")
560            .await
561            .unwrap();
562
563        const SHORT_DURATION: Duration = Duration::from_millis(100);
564
565        let sent_msg = vec![1, 2, 3, 4, 5];
566        for _ in 0..3 {
567            b.write(bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap())
568                .await
569                .unwrap();
570            b.write(sent_msg.clone()).await.unwrap();
571        }
572
573        for _ in 0..3 {
574            let read_msg = queue.read_msg().await.unwrap().unwrap();
575            assert_eq!(read_msg, sent_msg);
576        }
577    }
578
579    #[tokio::test]
580    async fn read_multiple_msgs_interleaved() {
581        let conn = FakeConnection::default();
582        let b = conn.create_blob("foo").await.unwrap();
583        let mut queue = GitHubReadQueue::new(Arc::new(conn), SHORT_DURATION, b_ids(), "foo")
584            .await
585            .unwrap();
586
587        let sent_msg = vec![1, 2, 3, 4, 5];
588        for _ in 0..3 {
589            b.write(bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap())
590                .await
591                .unwrap();
592            b.write(sent_msg.clone()).await.unwrap();
593
594            let read_msg = queue.read_msg().await.unwrap().unwrap();
595            assert_eq!(read_msg, sent_msg);
596        }
597    }
598
599    #[tokio::test]
600    async fn read_ignores_keep_alive_msgs() {
601        let conn = FakeConnection::default();
602        let b = conn.create_blob("foo").await.unwrap();
603        let mut queue = GitHubReadQueue::new(Arc::new(conn), SHORT_DURATION, b_ids(), "foo")
604            .await
605            .unwrap();
606
607        let sent_msg = vec![1, 2, 3, 4, 5];
608        for _ in 0..3 {
609            b.write(bincode::serialize(&MessageHeader::KeepAlive).unwrap())
610                .await
611                .unwrap();
612            b.write(bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap())
613                .await
614                .unwrap();
615            b.write(sent_msg.clone()).await.unwrap();
616        }
617
618        for _ in 0..3 {
619            let read_msg = queue.read_msg().await.unwrap().unwrap();
620            assert_eq!(read_msg, sent_msg);
621        }
622    }
623
624    #[tokio::test]
625    async fn read_with_shutdown() {
626        let conn = FakeConnection::default();
627        let b = conn.create_blob("foo").await.unwrap();
628        let mut queue = GitHubReadQueue::new(Arc::new(conn), SHORT_DURATION, b_ids(), "foo")
629            .await
630            .unwrap();
631
632        let sent_msg = vec![1, 2, 3, 4, 5];
633        b.write(bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap())
634            .await
635            .unwrap();
636        b.write(sent_msg.clone()).await.unwrap();
637        b.write(bincode::serialize(&MessageHeader::Shutdown).unwrap())
638            .await
639            .unwrap();
640
641        let read_msg = queue.read_msg().await.unwrap().unwrap();
642        assert_eq!(read_msg, sent_msg);
643        assert_eq!(queue.read_msg().await.unwrap(), None);
644    }
645
646    #[tokio::test]
647    async fn read_timeout() {
648        let conn = FakeConnection::default();
649        let _ = conn.create_blob("foo").await.unwrap();
650        let mut queue = GitHubReadQueue::new(Arc::new(conn), SHORT_DURATION, b_ids(), "foo")
651            .await
652            .unwrap();
653
654        queue.read_msg().await.unwrap_err();
655    }
656
657    #[tokio::test]
658    async fn write_msg() {
659        let conn = FakeConnection::default();
660        let mut queue = GitHubWriteQueue::new(&conn, FOREVER, "foo").await.unwrap();
661        let sent = [1, 2, 3, 4, 5];
662        queue.write_msg(&sent[..]).await.unwrap();
663
664        let mut expected = bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap();
665        expected.extend(sent);
666
667        let b = conn.get_blob(b_ids(), "foo").await.unwrap();
668        assert_eq!(b.data(), expected);
669    }
670
671    #[tokio::test]
672    async fn write_many_msgs() {
673        let conn = FakeConnection::default();
674        let mut queue = GitHubWriteQueue::new(&conn, FOREVER, "foo").await.unwrap();
675        let sent = vec![1, 2, 3, 4, 5];
676        queue.write_many_msgs(&vec![sent.clone(); 3]).await.unwrap();
677
678        let mut expected = vec![];
679        for _ in 0..3 {
680            expected.extend(bincode::serialize(&MessageHeader::Payload { size: 5 }).unwrap());
681            expected.extend(sent.clone());
682        }
683
684        let b = conn.get_blob(b_ids(), "foo").await.unwrap();
685        assert_eq!(b.data(), expected);
686    }
687
688    #[tokio::test]
689    async fn write_shutdown() {
690        let conn = FakeConnection::default();
691        let mut queue = GitHubWriteQueue::new(&conn, FOREVER, "foo").await.unwrap();
692        queue.shut_down().await.unwrap();
693
694        let expected = bincode::serialize(&MessageHeader::Shutdown).unwrap();
695
696        let b = conn.get_blob(b_ids(), "foo").await.unwrap();
697        assert_eq!(b.data(), expected);
698    }
699
700    #[tokio::test]
701    async fn keep_alive() {
702        let conn = FakeConnection::default();
703        let queue = GitHubWriteQueue::new(&conn, Duration::from_micros(1), "foo")
704            .await
705            .unwrap();
706        tokio::time::sleep(Duration::from_millis(150)).await;
707        drop(queue);
708
709        let b = conn.get_blob(b_ids(), "foo").await.unwrap();
710        let data = b.data();
711        let mut cursor = &data[..];
712
713        let mut keep_alive_count = 0;
714        while !cursor.is_empty() {
715            let header: MessageHeader = bincode::deserialize_from(&mut cursor).unwrap();
716            assert_eq!(header, MessageHeader::KeepAlive);
717            keep_alive_count += 1;
718        }
719
720        assert!(keep_alive_count > 50, "{keep_alive_count}");
721    }
722
723    #[tokio::test]
724    async fn accept_and_connect() {
725        let conn = Arc::new(FakeConnection::default());
726
727        let their_conn = conn.clone();
728        tokio::task::spawn(async move {
729            let mut acceptor = GitHubQueueAcceptor::new(their_conn, "foo").await.unwrap();
730            let mut queue_b = acceptor.accept_one().await.unwrap();
731            queue_b.write_msg(&b"hello"[..]).await.unwrap();
732        });
733
734        let mut queue_a = GitHubQueue::connect(conn, "foo").await.unwrap();
735        let msg = queue_a.read_msg().await.unwrap().unwrap();
736        assert_eq!(msg, b"hello");
737    }
738
739    async fn acceptor(client: GitHubClient) {
740        let mut acceptor = GitHubQueueAcceptor::new(Arc::new(client), "foo")
741            .await
742            .unwrap();
743
744        let mut handles = vec![];
745        for _ in 0..2 {
746            let mut queue = acceptor.accept_one().await.unwrap();
747            handles.push(tokio::task::spawn(async move {
748                for _ in 0..3 {
749                    queue.write_msg(&b"ping"[..]).await.unwrap();
750                    let msg = queue.read_msg().await.unwrap().unwrap();
751                    assert_eq!(msg, b"pong");
752                }
753                queue.shut_down().await.unwrap();
754            }));
755        }
756
757        for h in handles {
758            h.await.unwrap();
759        }
760    }
761
762    async fn connector(client: GitHubClient) {
763        let mut sock = GitHubQueue::connect(Arc::new(client), "foo").await.unwrap();
764        while let Some(msg) = sock.read_msg().await.unwrap() {
765            assert_eq!(msg, b"ping");
766            sock.write_msg(&b"pong"[..]).await.unwrap();
767        }
768    }
769
770    #[tokio::test]
771    async fn real_github_integration_test() {
772        let Some(client) = crate::client::tests::client_factory() else {
773            println!("skipping due to missing GitHub credentials");
774            return;
775        };
776        println!("test found GitHub credentials");
777
778        match &std::env::var("TEST_ACTOR").unwrap()[..] {
779            "1" => acceptor(client).await,
780            "2" => connector(client).await,
781            "3" => connector(client).await,
782            _ => panic!("unknown test actor"),
783        }
784    }
785}