1use std::convert::TryFrom;
2use std::net::SocketAddr;
3
4use tokio::task::{self, JoinHandle};
5use tokio::net::UdpSocket;
6use tokio::sync::mpsc;
7
8use anyhow::{anyhow, bail, ensure};
9
10#[allow(unused_imports)]
11use log::{LevelFilter, info, warn};
12
13use message::{Response, Request};
14use rosc::{OscMessage, OscPacket};
15
16use lay::{
17 Layer,
18 Measured,
19 operations::{opid, OpArgs},
20 gates::{PauliGate, HGate, SGate, TGate, CXGate}
21};
22
23pub mod message;
24
25const SEND_QUEUE_LEN: usize = 1000;
26const RECV_QUEUE_LEN: usize = 1000;
27const OSC_BUF_LEN: usize = 1000;
28
29async fn device_comm_loop(tx_addr: SocketAddr,
30 rx_addr: SocketAddr,
31 mut req_rx: mpsc::Receiver<Option<Request>>,
32 meas_tx: mpsc::Sender<Option<((u32, u32), bool)>>) -> anyhow::Result<()> {
33 let rx_sock = UdpSocket::bind(rx_addr).await?;
34 let mut buf = vec![0; OSC_BUF_LEN];
35 while let Some(msg) = req_rx.recv().await {
36 info!("device_sender_loop: Received from channel: {:?}", msg);
37 match msg {
38 Some(msg) => {
39 let packet = rosc::encoder::encode(&OscPacket::Message(OscMessage::from(&msg))
40 ).map_err(|e| anyhow!("{:?}", e))?;
41 rx_sock.send_to(&packet, tx_addr).await?;
42 if let Request::Mz(x, y) = msg {
43 let res = receive_response(&mut buf, &rx_sock).await?;
44 info!("Received from device: {:?}", res);
45 let measured = match res { Response::Mz(_, f) => (f as u32) == 1 };
46 meas_tx.send(Some(((x as u32, y as u32), measured))).await?;
47 }
48 },
49 None => {
50 meas_tx.send(None).await?;
51 },
52 }
53 }
54 bail!("device_sender_loop unexpected finished");
55}
56
57async fn receive_response(buf: &mut Vec<u8>, sock: &UdpSocket) -> anyhow::Result<Response> {
58 let len = sock.recv(buf).await?;
59 let packet = rosc::decoder::decode(&buf[..len]).map_err(|e| anyhow!("{:?}", e))?;
60 let msg = Response::try_from(match packet {
61 OscPacket::Message(msg) => {
62 warn!("Message without Bundle");
63 msg
64 },
65 OscPacket::Bundle(mut bundle) => {
66 ensure!(bundle.content.len() != 0, "Received empty bundle.");
67 ensure!(bundle.content.len() == 1, "Multiple messages in same bundle.");
68 match bundle.content.pop().unwrap() {
69 OscPacket::Message(msg) => msg,
70 OscPacket::Bundle(_bundle) => bail!("Received nested bundle.")
71 }
72 }
73 })?;
74 Ok(msg)
75}
76
77#[derive(Debug)]
78pub struct MitouOscLayer {
79 handle: JoinHandle<anyhow::Result<()>>,
80 size: (u32, u32),
81 sender: mpsc::Sender<Option<Request>>,
82 receiver: mpsc::Receiver<Option<((u32, u32), bool)>>,
83}
84
85impl MitouOscLayer {
86 pub fn exec(size: (u32, u32), device_tx: SocketAddr, device_rx: SocketAddr)
87 -> anyhow::Result<MitouOscLayer> {
88 exec(size, device_tx, device_rx)
89 }
90}
91
92impl Drop for MitouOscLayer {
93 fn drop(&mut self) {
94 self.handle.abort();
95 }
96}
97
98impl Layer for MitouOscLayer {
99 type Operation = OpArgs<Self>;
100 type Qubit = (u32, u32);
101 type Slot = (u32, u32);
102 type Buffer = MitouOscBuffer;
103 type Requested = anyhow::Result<()>;
104 type Response = anyhow::Result<()>;
105
106 fn send(&mut self, ops: &[Self::Operation]) -> Self::Requested {
107 for op in ops {
108 match op {
109 OpArgs::Empty(id) if *id == opid::INIT => {
110 for y in 0..(self.size.1 as i32) {
111 for x in 0..(self.size.0 as i32) {
112 self.sender.blocking_send(Some(Request::InitZero(x, y)))?;
113 }
114 }
115 }
116 OpArgs::Q(id, q) => {
117 let x = q.0 as i32;
118 let y = q.1 as i32;
119 match *id {
120 opid::X => {
121 self.sender.blocking_send(Some(Request::X(x, y)))?;
122 },
123 opid::Y => {
124 self.sender.blocking_send(Some(Request::Y(x, y)))?;
125 },
126 opid::Z => {
127 self.sender.blocking_send(Some(Request::Z(x, y)))?;
128 },
129 opid::S => {
130 self.sender.blocking_send(Some(Request::S(x, y)))?;
131 },
132 opid::SDG => {
133 self.sender.blocking_send(Some(Request::Sdg(x, y)))?;
134 },
135 opid::T => {
136 self.sender.blocking_send(Some(Request::T(x, y)))?;
137 },
138 opid::TDG => {
139 self.sender.blocking_send(Some(Request::Tdg(x, y)))?;
140 },
141 _ => {
142 bail!("Unexpected single qubit gate");
143 }
144 }
145 },
146 OpArgs::QS(id, q, s) if *id == opid::MEAS => {
147 ensure!(q == s, "Qubit and slot must be same.");
148 let x = q.0 as i32;
149 let y = q.1 as i32;
150 self.sender.blocking_send(Some(Request::Mz(x, y)))?;
151 },
152 OpArgs::QQ(id, c, t) if *id == opid::CX => {
153 self.sender
154 .blocking_send(Some(Request::CX(c.0 as i32, c.1 as i32, t.0 as i32, t.1 as i32)))?;
155 },
156 _ => {
157 bail!("Unexpected operation");
158 }
159 }
160 }
161 self.sender.blocking_send(None)?;
162 Ok(())
163 }
164
165 fn receive(&mut self, buf: &mut Self::Buffer) -> Self::Response {
166 loop {
167 match self.receiver.blocking_recv() {
168 Some(Some(((x, y), m))) => {
169 (buf.0)[x as usize + (y as usize * buf.1)] = m;
170 },
171 Some(None) => {
172 return Ok(());
173 }
174 _ => {
175 bail!("Unexpected response");
176 }
177 }
178 }
179 }
180
181 fn make_buffer(&self) -> Self::Buffer {
182 let v = vec![false; (self.size.0 * self.size.1) as usize];
183 MitouOscBuffer(v, self.size.0 as usize)
184 }
185}
186
187impl PauliGate for MitouOscLayer {}
188impl HGate for MitouOscLayer {}
189impl SGate for MitouOscLayer {}
190impl TGate for MitouOscLayer {}
191impl CXGate for MitouOscLayer {}
192
193#[derive(Debug, PartialEq, Eq)]
194pub struct MitouOscBuffer(Vec<bool>, usize);
195
196impl Measured for MitouOscBuffer {
197 type Slot = (u32, u32);
198 fn get(&self, pos: (u32, u32)) -> bool {
199 let (x, y) = pos;
200 (self.0)[self.1 * (y as usize) + (x as usize)]
201 }
202}
203
204fn exec(size: (u32, u32), device_tx: SocketAddr, device_rx: SocketAddr) -> anyhow::Result<MitouOscLayer>
205{
206 let (req_tx, req_rx) = mpsc::channel(SEND_QUEUE_LEN);
207 let (meas_tx, meas_rx) = mpsc::channel(RECV_QUEUE_LEN);
208 Ok(MitouOscLayer {
209 handle: task::spawn(async move {
210 let device_comm = task::spawn(device_comm_loop(device_tx, device_rx, req_rx, meas_tx));
211
212 device_comm.await??;
213 Ok(())
214 }),
215 size,
216 sender: req_tx,
217 receiver: meas_rx,})
218}