lay_mitouosc/
lib.rs

1use std::convert::TryFrom;
2use std::net::SocketAddr;
3
4use tokio::task::{self, JoinHandle};
5use tokio::net::UdpSocket;
6use tokio::sync::mpsc;
7
8use anyhow::{anyhow, bail, ensure};
9
10#[allow(unused_imports)]
11use log::{LevelFilter, info, warn};
12
13use message::{Response, Request};
14use rosc::{OscMessage, OscPacket};
15
16use lay::{
17    Layer,
18    Measured,
19    operations::{opid, OpArgs},
20    gates::{PauliGate, HGate, SGate, TGate, CXGate}
21};
22
23pub mod message;
24
25const SEND_QUEUE_LEN: usize = 1000;
26const RECV_QUEUE_LEN: usize = 1000;
27const OSC_BUF_LEN: usize = 1000;
28
29async fn device_comm_loop(tx_addr: SocketAddr,
30                          rx_addr: SocketAddr,
31                          mut req_rx: mpsc::Receiver<Option<Request>>,
32                          meas_tx: mpsc::Sender<Option<((u32, u32), bool)>>) -> anyhow::Result<()> {
33    let rx_sock = UdpSocket::bind(rx_addr).await?;
34    let mut buf = vec![0; OSC_BUF_LEN];
35    while let Some(msg) = req_rx.recv().await {
36        info!("device_sender_loop: Received from channel: {:?}", msg);
37        match msg {
38            Some(msg) => {
39                let packet = rosc::encoder::encode(&OscPacket::Message(OscMessage::from(&msg))
40                        ).map_err(|e| anyhow!("{:?}", e))?;
41                rx_sock.send_to(&packet, tx_addr).await?;
42                if let Request::Mz(x, y) = msg {
43                    let res = receive_response(&mut buf, &rx_sock).await?;
44                    info!("Received from device: {:?}", res);
45                    let measured = match res { Response::Mz(_, f) => (f as u32) == 1 };
46                    meas_tx.send(Some(((x as u32, y as u32), measured))).await?;
47                }
48            },
49            None => {
50                meas_tx.send(None).await?;
51            },
52        }
53    }
54    bail!("device_sender_loop unexpected finished");
55}
56
57async fn receive_response(buf: &mut Vec<u8>, sock: &UdpSocket) -> anyhow::Result<Response> {
58    let len = sock.recv(buf).await?;
59    let packet = rosc::decoder::decode(&buf[..len]).map_err(|e| anyhow!("{:?}", e))?;
60    let msg = Response::try_from(match packet {
61        OscPacket::Message(msg) => {
62            warn!("Message without Bundle");
63            msg
64        },
65        OscPacket::Bundle(mut bundle) => {
66            ensure!(bundle.content.len() != 0, "Received empty bundle.");
67            ensure!(bundle.content.len() == 1, "Multiple messages in same bundle.");
68            match bundle.content.pop().unwrap() {
69                OscPacket::Message(msg) => msg,
70                OscPacket::Bundle(_bundle) => bail!("Received nested bundle.")
71            }
72        }
73    })?;
74    Ok(msg)
75}
76
77#[derive(Debug)]
78pub struct MitouOscLayer {
79    handle: JoinHandle<anyhow::Result<()>>,
80    size: (u32, u32),
81    sender: mpsc::Sender<Option<Request>>,
82    receiver: mpsc::Receiver<Option<((u32, u32), bool)>>,
83}
84
85impl MitouOscLayer {
86    pub fn exec(size: (u32, u32), device_tx: SocketAddr, device_rx: SocketAddr)
87            -> anyhow::Result<MitouOscLayer> {
88        exec(size, device_tx, device_rx)
89    }
90}
91
92impl Drop for MitouOscLayer {
93    fn drop(&mut self) {
94        self.handle.abort();
95    }
96}
97
98impl Layer for MitouOscLayer {
99    type Operation = OpArgs<Self>;
100    type Qubit = (u32, u32);
101    type Slot = (u32, u32);
102    type Buffer = MitouOscBuffer;
103    type Requested = anyhow::Result<()>;
104    type Response = anyhow::Result<()>;
105
106    fn send(&mut self, ops: &[Self::Operation]) -> Self::Requested {
107        for op in ops {
108            match op {
109                OpArgs::Empty(id) if *id == opid::INIT => {
110                    for y in 0..(self.size.1 as i32) {
111                        for x in 0..(self.size.0 as i32) {
112                            self.sender.blocking_send(Some(Request::InitZero(x, y)))?;
113                        }
114                    }
115                }
116                OpArgs::Q(id, q) => {
117                    let x = q.0 as i32;
118                    let y = q.1 as i32;
119                    match *id {
120                        opid::X => {
121                            self.sender.blocking_send(Some(Request::X(x, y)))?;
122                        },
123                        opid::Y => {
124                            self.sender.blocking_send(Some(Request::Y(x, y)))?;
125                        },
126                        opid::Z => {
127                            self.sender.blocking_send(Some(Request::Z(x, y)))?;
128                        },
129                        opid::S => {
130                            self.sender.blocking_send(Some(Request::S(x, y)))?;
131                        },
132                        opid::SDG => {
133                            self.sender.blocking_send(Some(Request::Sdg(x, y)))?;
134                        },
135                        opid::T => {
136                            self.sender.blocking_send(Some(Request::T(x, y)))?;
137                        },
138                        opid::TDG => {
139                            self.sender.blocking_send(Some(Request::Tdg(x, y)))?;
140                        },
141                        _ => {
142                            bail!("Unexpected single qubit gate");
143                        }
144                    }
145                },
146                OpArgs::QS(id, q, s) if *id == opid::MEAS => {
147                    ensure!(q == s, "Qubit and slot must be same.");
148                    let x = q.0 as i32;
149                    let y = q.1 as i32;
150                    self.sender.blocking_send(Some(Request::Mz(x, y)))?;
151                },
152                OpArgs::QQ(id, c, t) if *id == opid::CX => {
153                    self.sender
154                        .blocking_send(Some(Request::CX(c.0 as i32, c.1 as i32, t.0 as i32, t.1 as i32)))?;
155                },
156                _ => {
157                    bail!("Unexpected operation");
158                }
159            }
160        }
161        self.sender.blocking_send(None)?;
162        Ok(())
163    }
164
165    fn receive(&mut self, buf: &mut Self::Buffer) -> Self::Response {
166        loop {
167            match self.receiver.blocking_recv() {
168                Some(Some(((x, y), m))) => {
169                    (buf.0)[x as usize + (y as usize * buf.1)] = m;
170                },
171                Some(None) => {
172                    return Ok(());
173                }
174                _ => {
175                    bail!("Unexpected response");
176                }
177            }
178        }
179    }
180
181    fn make_buffer(&self) -> Self::Buffer {
182        let v = vec![false; (self.size.0 * self.size.1) as usize];
183        MitouOscBuffer(v, self.size.0 as usize)
184    }
185}
186
187impl PauliGate for MitouOscLayer {}
188impl HGate for MitouOscLayer {}
189impl SGate for MitouOscLayer {}
190impl TGate for MitouOscLayer {}
191impl CXGate for MitouOscLayer {}
192
193#[derive(Debug, PartialEq, Eq)]
194pub struct MitouOscBuffer(Vec<bool>, usize);
195
196impl Measured for MitouOscBuffer {
197    type Slot = (u32, u32);
198    fn get(&self, pos: (u32, u32)) -> bool {
199        let (x, y) = pos;
200        (self.0)[self.1 * (y as usize) + (x as usize)]
201    }
202}
203
204fn exec(size: (u32, u32), device_tx: SocketAddr, device_rx: SocketAddr) -> anyhow::Result<MitouOscLayer>
205{
206    let (req_tx, req_rx) = mpsc::channel(SEND_QUEUE_LEN);
207    let (meas_tx, meas_rx) = mpsc::channel(RECV_QUEUE_LEN);
208    Ok(MitouOscLayer {
209        handle: task::spawn(async move {
210            let device_comm = task::spawn(device_comm_loop(device_tx, device_rx, req_rx, meas_tx));
211
212            device_comm.await??;
213            Ok(())
214        }),
215        size,
216        sender: req_tx,
217        receiver: meas_rx,})
218}