1use std::collections::HashMap;
16
17use num_traits::ToPrimitive;
18use serde_derive::{Deserialize, Serialize};
19use serde_json::{json, Value};
20
21use super::super::memory::Payload;
22use super::json::*;
23use super::status::*;
24use super::uuid::*;
25
26#[non_exhaustive]
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct Command;
29
30#[allow(dead_code)]
31impl Command {
32 pub const REGISTER_REQUEST: &'static str = "register_request";
33 pub const REGISTER_REPLY: &'static str = "register_reply";
34 pub const EXIT_REQUEST: &'static str = "exit_request";
35 pub const EXIT_REPLY: &'static str = "exit_reply";
36
37 pub const CREATE_BUFFER_REQUEST: &'static str = "create_buffer_request";
39 pub const CREATE_BUFFER_REPLY: &'static str = "create_buffer_reply";
40 pub const CREATE_DISK_BUFFER_REQUEST: &'static str = "create_disk_buffer_request";
41 pub const CREATE_DISK_BUFFER_REPLY: &'static str = "create_disk_buffer_reply";
42 pub const CREATE_GPU_BUFFER_REQUEST: &'static str = "create_gpu_buffer_request";
43 pub const CREATE_GPU_BUFFER_REPLY: &'static str = "create_gpu_buffer_reply";
44 pub const SEAL_BUFFER_REQUEST: &'static str = "seal_request";
45 pub const SEAL_BUFFER_REPLY: &'static str = "seal_reply";
46 pub const GET_BUFFERS_REQUEST: &'static str = "get_buffers_request";
47 pub const GET_BUFFERS_REPLY: &'static str = "get_buffers_reply";
48 pub const GET_GPU_BUFFERS_REQUEST: &'static str = "get_gpu_buffers_request";
49 pub const GET_GPU_BUFFERS_REPLY: &'static str = "get_gpu_buffers_reply";
50 pub const DROP_BUFFER_REQUEST: &'static str = "drop_buffer_request";
51 pub const DROP_BUFFER_REPLY: &'static str = "drop_buffer_reply";
52
53 pub const REQUEST_FD_REQUEST: &'static str = "request_fd_request";
54 pub const REQUEST_FD_REPLY: &'static str = "request_fd_reply";
55
56 pub const CREATE_REMOTE_BUFFER_REQUEST: &'static str = "create_remote_buffer_request";
57 pub const GET_REMOTE_BUFFERS_REQUEST: &'static str = "get_remote_buffers_request";
58
59 pub const INCREASE_REFERENCE_COUNT_REQUEST: &'static str = "increase_reference_count_request";
60 pub const INCREASE_REFERENCE_COUNT_REPLY: &'static str = "increase_reference_count_reply";
61 pub const RELEASE_REQUEST: &'static str = "release_request";
62 pub const RELEASE_REPLY: &'static str = "release_reply";
63 pub const DEL_DATA_WITH_FEEDBACKS_REQUEST: &'static str = "del_data_with_feedbacks_request";
64 pub const DEL_DATA_WITH_FEEDBACKS_REPLY: &'static str = "del_data_with_feedbacks_reply";
65
66 pub const CREATE_BUFFER_PLASMA_REQUEST: &'static str = "create_buffer_by_plasma_request";
67 pub const CREATE_BUFFER_PLASMA_REPLY: &'static str = "create_buffer_by_plasma_reply";
68 pub const GET_BUFFERS_PLASMA_REQUEST: &'static str = "get_buffers_by_plasma_request";
69 pub const GET_BUFFERS_PLASMA_REPLY: &'static str = "get_buffers_by_plasma_reply";
70 pub const PLASMA_SEAL_REQUEST: &'static str = "plasma_seal_request";
71 pub const PLASMA_SEAL_REPLY: &'static str = "plasma_seal_reply";
72 pub const PLASMA_RELEASE_REQUEST: &'static str = "plasma_release_request";
73 pub const PLASMA_RELEASE_REPLY: &'static str = "plasma_release_reply";
74 pub const PLASMA_DEL_DATA_REQUEST: &'static str = "plasma_delete_data_request";
75 pub const PLASMA_DEL_DATA_REPLY: &'static str = "plasma_delete_data_reply";
76
77 pub const CREATE_DATA_REQUEST: &'static str = "create_data_request";
79 pub const CREATE_DATA_REPLY: &'static str = "create_data_reply";
80 pub const GET_DATA_REQUEST: &'static str = "get_data_request";
81 pub const GET_DATA_REPLY: &'static str = "get_data_reply";
82 pub const LIST_DATA_REQUEST: &'static str = "list_data_request";
83 pub const LIST_DATA_REPLY: &'static str = "list_data_reply";
84 pub const DELETE_DATA_REQUEST: &'static str = "del_data_request";
85 pub const DELETE_DATA_REPLY: &'static str = "del_data_reply";
86 pub const EXISTS_REQUEST: &'static str = "exists_request";
87 pub const EXISTS_REPLY: &'static str = "exists_reply";
88 pub const PERSIST_REQUEST: &'static str = "persist_request";
89 pub const PERSIST_REPLY: &'static str = "persist_reply";
90 pub const IF_PERSIST_REQUEST: &'static str = "if_persist_request";
91 pub const IF_PERSIST_REPLY: &'static str = "if_persist_reply";
92 pub const LABEL_REQUEST: &'static str = "label_request";
93 pub const LABEL_REPLY: &'static str = "label_reply";
94 pub const CLEAR_REQUEST: &'static str = "clear_request";
95 pub const CLEAR_REPLY: &'static str = "clear_reply";
96
97 pub const CREATE_STREAM_REQUEST: &'static str = "create_stream_request";
99 pub const CREATE_STREAM_REPLY: &'static str = "create_stream_reply";
100 pub const OPEN_STREAM_REQUEST: &'static str = "open_stream_request";
101 pub const OPEN_STREAM_REPLY: &'static str = "open_stream_reply";
102 pub const GET_NEXT_STREAM_CHUNK_REQUEST: &'static str = "get_next_stream_chunk_request";
103 pub const GET_NEXT_STREAM_CHUNK_REPLY: &'static str = "get_next_stream_chunk_reply";
104 pub const PUSH_NEXT_STREAM_CHUNK_REQUEST: &'static str = "push_next_stream_chunk_request";
105 pub const PUSH_NEXT_STREAM_CHUNK_REPLY: &'static str = "push_next_stream_chunk_reply";
106 pub const PULL_NEXT_STREAM_CHUNK_REQUEST: &'static str = "pull_next_stream_chunk_request";
107 pub const PULL_NEXT_STREAM_CHUNK_REPLY: &'static str = "pull_next_stream_chunk_reply";
108 pub const STOP_STREAM_REQUEST: &'static str = "stop_stream_request";
109 pub const STOP_STREAM_REPLY: &'static str = "stop_stream_reply";
110 pub const DROP_STREAM_REQUEST: &'static str = "drop_stream_request";
111 pub const DROP_STREAM_REPLY: &'static str = "drop_stream_reply";
112
113 pub const PUT_NAME_REQUEST: &'static str = "put_name_request";
115 pub const PUT_NAME_REPLY: &'static str = "put_name_reply";
116 pub const GET_NAME_REQUEST: &'static str = "get_name_request";
117 pub const GET_NAME_REPLY: &'static str = "get_name_reply";
118 pub const LIST_NAME_REQUEST: &'static str = "list_name_request";
119 pub const LIST_NAME_REPLY: &'static str = "list_name_reply";
120 pub const DROP_NAME_REQUEST: &'static str = "drop_name_request";
121 pub const DROP_NAME_REPLY: &'static str = "drop_name_reply";
122
123 pub const MAKE_ARENA_REQUEST: &'static str = "make_arena_request";
125 pub const MAKE_ARENA_REPLY: &'static str = "make_arena_reply";
126 pub const FINALIZE_ARENA_REQUEST: &'static str = "finalize_arena_request";
127 pub const FINALIZE_ARENA_REPLY: &'static str = "finalize_arena_reply";
128
129 pub const NEW_SESSION_REQUEST: &'static str = "new_session_request";
131 pub const NEW_SESSION_REPLY: &'static str = "new_session_reply";
132 pub const DELETE_SESSION_REQUEST: &'static str = "delete_session_request";
133 pub const DELETE_SESSION_REPLY: &'static str = "delete_session_reply";
134
135 pub const MOVE_BUFFERS_OWNERSHIP_REQUEST: &'static str = "move_buffers_ownership_request";
136 pub const MOVE_BUFFERS_OWNERSHIP_REPLY: &'static str = "move_buffers_ownership_reply";
137
138 pub const EVICT_REQUEST: &'static str = "evict_request";
140 pub const EVICT_REPLY: &'static str = "evict_reply";
141 pub const LOAD_REQUEST: &'static str = "load_request";
142 pub const LOAD_REPLY: &'static str = "load_reply";
143 pub const UNPIN_REQUEST: &'static str = "unpin_request";
144 pub const UNPIN_REPLY: &'static str = "unpin_reply";
145 pub const IS_SPILLED_REQUEST: &'static str = "is_spilled_request";
146 pub const IS_SPILLED_REPLY: &'static str = "is_spilled_reply";
147 pub const IS_IN_USE_REQUEST: &'static str = "is_in_use_request";
148 pub const IS_IN_USE_REPLY: &'static str = "is_in_use_reply";
149
150 pub const CLUSTER_META_REQUEST: &'static str = "cluster_meta";
152 pub const CLUSTER_META_REPLY: &'static str = "cluster_meta";
153 pub const INSTANCE_STATUS_REQUEST: &'static str = "instance_status_request";
154 pub const INSTANCE_STATUS_REPLY: &'static str = "instance_status_reply";
155 pub const MIGRATE_OBJECT_REQUEST: &'static str = "migrate_object_request";
156 pub const MIGRATE_OBJECT_REPLY: &'static str = "migrate_object_reply";
157 pub const SHALLOW_COPY_REQUEST: &'static str = "shallow_copy_request";
158 pub const SHALLOW_COPY_REPLY: &'static str = "shallow_copy_reply";
159 pub const DEBUG_REQUEST: &'static str = "debug_command";
160 pub const DEBUG_REPLY: &'static str = "debug_reply";
161}
162
163fn check_ipc_error<'a>(root: &'a JSON, reply_type: &str) -> Result<()> {
164 if root.contains_key("code") {
165 let code = root["code"].as_u64().unwrap_or(0);
166 if code != 0 {
167 let mut error_message: String = "unable to find error message in the response".into();
168 if let Some(message) = root.get("message") {
169 if let Some(message) = message.as_str() {
170 error_message = message.into();
171 }
172 }
173 return Err(VineyardError::new(
174 unsafe { std::mem::transmute(code as u8) },
175 error_message,
176 ));
177 }
178 }
179 if let Some(message_type) = root.get("type") {
180 return vineyard_assert(
181 message_type.as_str().map_or(false, |t| t == reply_type),
182 format!("unexpected reply type: '{}'", message_type),
183 );
184 } else {
185 return vineyard_assert(false, "no 'type' field in the response");
186 }
187}
188
189#[derive(Debug, Default)]
190pub struct RegisterRequest {
191 pub version: String,
192 pub store_type: String,
193 pub session_id: i64,
194 pub username: String,
195 pub password: String,
196 pub support_rpc_compression: bool,
197}
198
199pub fn write_register_request(r: RegisterRequest) -> JSONResult<String> {
200 return serde_json::to_string(&json!({
201 "type": Command::REGISTER_REQUEST,
202 "version": r.version,
203 "store_type": r.store_type,
204 "session_id": r.session_id,
205 "username": r.username,
206 "password": r.password,
207 "support_rpc_compression": r.support_rpc_compression,
208 }));
209}
210
211#[derive(Debug, Default)]
212pub struct RegisterReply {
213 pub ipc_socket: String,
214 pub rpc_endpoint: String,
215 pub instance_id: InstanceID,
216 pub version: String,
217 pub support_rpc_compression: bool,
218}
219
220pub fn read_register_reply(message: &str) -> Result<RegisterReply> {
221 let root: Value = serde_json::from_str(message)?;
222 let root = parse_json_object(&root)?;
223 check_ipc_error(&root, Command::REGISTER_REPLY)?;
224
225 return Ok(RegisterReply {
226 ipc_socket: get_string(root, "ipc_socket")?.into(),
227 rpc_endpoint: get_string(root, "rpc_endpoint")?.into(),
228 instance_id: get_uint(root, "instance_id")?,
229 version: get_string(root, "version")?.into(),
230 support_rpc_compression: get_bool_or(root, "support_rpc_compression", false),
231 });
232}
233
234pub fn write_exit_request() -> JSONResult<String> {
235 return serde_json::to_string(&json!({
236 "type": Command::EXIT_REQUEST,
237 }));
238}
239
240#[derive(Debug, Default)]
241pub struct CreateBufferReply {
242 pub id: ObjectID,
243 pub payload: Payload,
244 pub fd: i32,
245}
246
247pub fn write_create_buffer_request(size: usize) -> JSONResult<String> {
248 return serde_json::to_string(&json!({
249 "type": Command::CREATE_BUFFER_REQUEST,
250 "size": size,
251 }));
252}
253
254pub fn read_create_buffer_reply(message: &str) -> Result<CreateBufferReply> {
255 let root: Value = serde_json::from_str(message)?;
256 let root = parse_json_object(&root)?;
257 check_ipc_error(&root, Command::CREATE_BUFFER_REPLY)?;
258
259 let created = parse_json_object(&root["created"])?;
260 let payload = Payload::from_json(&created)?;
261
262 return Ok(CreateBufferReply {
263 id: get_uint(root, "id")?,
264 payload: payload,
265 fd: get_int32::<i32>(root, "fd")?,
266 });
267}
268
269pub fn write_create_disk_buffer_request(size: usize, path: &str) -> JSONResult<String> {
270 return serde_json::to_string(&json!({
271 "type": Command::CREATE_DISK_BUFFER_REQUEST,
272 "size": size,
273 "path": path,
274 }));
275}
276
277pub fn read_create_disk_buffer_reply(message: &str) -> Result<CreateBufferReply> {
278 let root: Value = serde_json::from_str(message)?;
279 let root = parse_json_object(&root)?;
280 check_ipc_error(&root, Command::CREATE_DISK_BUFFER_REPLY)?;
281
282 let created = parse_json_object(&root["created"])?;
283 let payload = Payload::from_json(&created)?;
284
285 return Ok(CreateBufferReply {
286 id: get_uint(root, "id")?,
287 payload: payload,
288 fd: get_int::<i64>(root, "fd")?
289 .to_i32()
290 .ok_or(VineyardError::io_error(
291 "fd received from server must be a 32-bit integer",
292 ))?,
293 });
294}
295
296#[derive(Debug, Default)]
297pub struct CreateGPUBufferReply {
298 pub id: ObjectID,
299 pub payload: Payload,
300 pub handle: Vec<i64>,
301}
302
303pub fn write_create_gpu_buffer_request(size: usize) -> JSONResult<String> {
304 return serde_json::to_string(&json!({
305 "type": Command::CREATE_GPU_BUFFER_REQUEST,
306 "size": size,
307 }));
308}
309
310pub fn read_create_gpu_buffer_reply(message: &str) -> Result<CreateGPUBufferReply> {
311 let root: Value = serde_json::from_str(message)?;
312 let root = parse_json_object(&root)?;
313 check_ipc_error(&root, Command::CREATE_GPU_BUFFER_REPLY)?;
314
315 let created = parse_json_object(&root["created"])?;
316 let payload = Payload::from_json(&created)?;
317
318 let handle = root["handle"]
319 .as_array()
320 .ok_or(VineyardError::io_error("handle is not an array"))?
321 .iter()
322 .map(|v| {
323 v.as_i64()
324 .ok_or(VineyardError::io_error("handle is not an integer"))
325 })
326 .collect::<Result<Vec<i64>>>()?;
327
328 return Ok(CreateGPUBufferReply {
329 id: get_uint(root, "id")?,
330 payload: payload,
331 handle: handle,
332 });
333}
334
335pub fn write_seal_request(id: ObjectID) -> JSONResult<String> {
336 return serde_json::to_string(&json!({
337 "type": Command::SEAL_BUFFER_REQUEST,
338 "object_id": id,
339 }));
340}
341
342pub fn read_seal_reply(message: &str) -> Result<()> {
343 let root: Value = serde_json::from_str(message)?;
344 let root = parse_json_object(&root)?;
345 check_ipc_error(&root, Command::SEAL_BUFFER_REPLY)?;
346
347 return Ok(());
348}
349
350#[derive(Debug, Default)]
351pub struct GetBuffersReply {
352 pub payloads: Vec<Payload>,
353 pub fds: Vec<i32>,
354 pub compress: bool,
355}
356
357pub fn write_get_buffers_request(ids: &[ObjectID], unsafe_: bool) -> JSONResult<String> {
358 return serde_json::to_string(&json!({
359 "type": Command::GET_BUFFERS_REQUEST,
360 "ids": ids,
361 "unsafe": unsafe_,
362 }));
363}
364
365pub fn read_get_buffers_reply(message: &str) -> Result<GetBuffersReply> {
366 let root: Value = serde_json::from_str(message)?;
367 let root = parse_json_object(&root)?;
368 check_ipc_error(&root, Command::GET_BUFFERS_REPLY)?;
369
370 let mut reply = GetBuffersReply::default();
371
372 if let Some(Value::Array(ref payloads)) = root.get("payloads") {
373 for payload in payloads {
374 reply
375 .payloads
376 .push(Payload::from_json(payload.as_object().ok_or(
377 VineyardError::io_error(
378 "invalid get_buffers reply: payload in message is not a JSON object",
379 ),
380 )?)?);
381 }
382 } else {
383 let num: i64 = get_int(root, "num")?;
384 for i in 0..num {
385 match root[&i.to_string()] {
386 Value::Object(ref payload) => {
387 reply.payloads.push(Payload::from_json(payload)?);
388 }
389 _ => {
390 return Err(VineyardError::io_error(
391 "invalid get_buffers reply: payload in message is not a JSON object",
392 ));
393 }
394 }
395 }
396 }
397
398 if let Some(Value::Array(ref fds)) = root.get("fds") {
399 for fd in fds {
400 reply.fds.push(
401 fd.as_i64()
402 .ok_or(VineyardError::io_error("fd is not an integer"))?
403 .to_i32()
404 .ok_or(VineyardError::io_error(
405 "fd received from server must be a 32-bit integer",
406 ))?,
407 );
408 }
409 }
410 return Ok(reply);
411}
412
413pub fn write_drop_buffer_request(id: ObjectID) -> JSONResult<String> {
414 return serde_json::to_string(&json!({
415 "type": Command::DROP_BUFFER_REQUEST,
416 "id": id,
417 }));
418}
419
420pub fn read_drop_buffer_reply(message: &str) -> Result<()> {
421 let root: Value = serde_json::from_str(message)?;
422 let root = parse_json_object(&root)?;
423 check_ipc_error(&root, Command::DROP_BUFFER_REPLY)?;
424
425 return Ok(());
426}
427
428pub fn write_create_remote_buffer_request(size: usize, compress: bool) -> JSONResult<String> {
429 return serde_json::to_string(&json!({
430 "type": Command::CREATE_REMOTE_BUFFER_REQUEST,
431 "size": size,
432 "compress": compress,
433 }));
434}
435
436pub fn read_create_remote_buffer_reply(message: &str) -> Result<CreateBufferReply> {
437 return read_create_buffer_reply(message);
438}
439
440pub fn write_get_remote_buffers_request(ids: &[ObjectID]) -> JSONResult<String> {
441 return serde_json::to_string(&json!({
442 "type": Command::GET_REMOTE_BUFFERS_REQUEST,
443 "ids": ids,
444 }));
445}
446
447pub fn read_get_remote_buffers_reply(message: &str) -> Result<GetBuffersReply> {
448 return read_get_buffers_reply(message);
449}
450
451pub fn write_increase_reference_count_request(id: &[ObjectID]) -> JSONResult<String> {
452 return serde_json::to_string(&json!({
453 "type": Command::INCREASE_REFERENCE_COUNT_REQUEST,
454 "ids": id,
455 }));
456}
457
458pub fn read_increase_reference_count_reply(message: &str) -> Result<()> {
459 let root: Value = serde_json::from_str(message)?;
460 let root = parse_json_object(&root)?;
461 check_ipc_error(&root, Command::INCREASE_REFERENCE_COUNT_REPLY)?;
462
463 return Ok(());
464}
465
466pub fn write_release_request(id: ObjectID) -> JSONResult<String> {
467 return serde_json::to_string(&json!({
468 "type": Command::RELEASE_REQUEST,
469 "object_id": id,
470 }));
471}
472
473pub fn read_release_reply(message: &str) -> Result<()> {
474 let root: Value = serde_json::from_str(message)?;
475 let root = parse_json_object(&root)?;
476 check_ipc_error(&root, Command::RELEASE_REPLY)?;
477
478 return Ok(());
479}
480
481#[derive(Debug, Default)]
482pub struct CreateDataReply {
483 pub id: ObjectID,
484 pub signature: Signature,
485 pub instance_id: InstanceID,
486}
487
488pub fn write_create_data_request(content: &JSON) -> JSONResult<String> {
489 return serde_json::to_string(&json!({
490 "type": Command::CREATE_DATA_REQUEST,
491 "content": content,
492 }));
493}
494
495pub fn read_create_data_reply(message: &str) -> Result<CreateDataReply> {
496 let root: Value = serde_json::from_str(message)?;
497 let root = parse_json_object(&root)?;
498 check_ipc_error(&root, "create_data_reply")?;
499
500 return Ok(CreateDataReply {
501 id: get_uint(root, "id")?,
502 signature: get_uint(root, "signature")?,
503 instance_id: get_uint(root, "instance_id")?,
504 });
505}
506
507pub fn write_get_data_request(id: ObjectID, sync_remote: bool, wait: bool) -> JSONResult<String> {
508 return serde_json::to_string(&json!({
509 "type": Command::GET_DATA_REQUEST,
510 "id": vec![id],
511 "sync_remote": sync_remote,
512 "wait": wait,
513 }));
514}
515
516pub fn read_get_data_reply(message: &str) -> Result<JSON> {
517 let root: Value = serde_json::from_str(message)?;
518 let root = parse_json_object(&root)?;
519 check_ipc_error(&root, "get_data_reply")?;
520
521 match root["content"] {
522 Value::Array(ref content) => {
523 if content.len() != 1 {
524 return Err(VineyardError::io_error(
525 "failed to read get_data reply: content array's length is not 1",
526 ));
527 }
528 return Ok(parse_json_object(&content[0])?.clone());
529 }
530 Value::Object(ref content) => match content.iter().next() {
531 None => {
532 return Err(VineyardError::io_error(
533 "failed to read get_data reply: content dict's length is not 1",
534 ));
535 }
536 Some((_, meta)) => {
537 return Ok(parse_json_object(meta)?.clone());
538 }
539 },
540 _ => {
541 return Err(VineyardError::io_error(
542 "failed to read get_data reply: content is not an array or a dict",
543 ));
544 }
545 }
546}
547
548pub fn write_get_data_batch_request(
549 ids: &[ObjectID],
550 sync_remote: bool,
551 wait: bool,
552) -> JSONResult<String> {
553 return serde_json::to_string(&json!({
554 "type": Command::GET_DATA_REQUEST,
555 "id": ids,
556 "sync_remote": sync_remote,
557 "wait": wait,
558 }));
559}
560
561pub fn read_get_data_batch_reply(message: &str) -> Result<HashMap<ObjectID, JSON>> {
562 let root: Value = serde_json::from_str(message)?;
563 let root = parse_json_object(&root)?;
564 check_ipc_error(&root, "get_data_reply")?;
565
566 match root["content"] {
567 Value::Array(ref content) => {
568 let mut data = HashMap::new();
569 for item in content {
570 let object = parse_json_object(&item)?;
571 data.insert(
572 object_id_from_string(get_string(object, "id")?)?,
573 object.clone(),
574 );
575 }
576 return Ok(data);
577 }
578 Value::Object(ref content) => {
579 let mut data = HashMap::new();
580 for (id, object) in content.iter() {
581 data.insert(
582 object_id_from_string(id)?,
583 parse_json_object(object)?.clone(),
584 );
585 }
586 return Ok(data);
587 }
588 _ => {
589 return Err(VineyardError::io_error(
590 "failed to read get_data reply: content is not an array or a dict",
591 ));
592 }
593 }
594}
595
596pub fn write_list_data_request(pattern: &str, regex: bool, limit: usize) -> JSONResult<String> {
597 return serde_json::to_string(&json!({
598 "type": Command::LIST_DATA_REQUEST,
599 "pattern": pattern,
600 "regex": regex,
601 "limit": limit,
602 }));
603}
604
605pub fn read_list_data_reply(message: &str) -> Result<Vec<JSON>> {
606 let root: Value = serde_json::from_str(message)?;
607 let root = parse_json_object(&root)?;
608 check_ipc_error(&root, "list_data_reply")?;
609
610 let mut reply = Vec::new();
611 match root["content"] {
612 Value::Array(ref data) => {
613 for item in data {
614 reply.push(parse_json_object(&item)?.clone());
615 }
616 return Ok(reply);
617 }
618 _ => {
619 return Err(VineyardError::io_error(
620 "failed to read list_data reply: data is not an array",
621 ));
622 }
623 }
624}
625
626pub fn write_delete_data_request(
627 id: ObjectID,
628 force: bool,
629 deep: bool,
630 fastpath: bool,
631) -> JSONResult<String> {
632 return serde_json::to_string(&json!({
633 "type": Command::DELETE_DATA_REQUEST,
634 "id": vec![id],
635 "force": force,
636 "deep:": deep,
637 "fastpath": fastpath,
638 }));
639}
640
641pub fn write_delete_data_batch_request(
642 ids: &[ObjectID],
643 force: bool,
644 deep: bool,
645 fastpath: bool,
646) -> JSONResult<String> {
647 return serde_json::to_string(&json!({
648 "type": Command::DELETE_DATA_REQUEST,
649 "id": ids,
650 "force": force,
651 "deep:": deep,
652 "fastpath": fastpath,
653 }));
654}
655
656pub fn read_delete_data_reply(message: &str) -> Result<()> {
657 let root: Value = serde_json::from_str(message)?;
658 let root = parse_json_object(&root)?;
659 check_ipc_error(&root, Command::DELETE_DATA_REPLY)?;
660
661 return Ok(());
662}
663
664pub fn write_exists_request(id: ObjectID) -> JSONResult<String> {
665 return serde_json::to_string(&json!({
666 "type": Command::EXISTS_REQUEST,
667 "id": id,
668 }));
669}
670
671pub fn read_exists_reply(message: &str) -> Result<bool> {
672 let root: Value = serde_json::from_str(message)?;
673 let root = parse_json_object(&root)?;
674 check_ipc_error(&root, Command::EXISTS_REPLY)?;
675
676 return Ok(get_bool_or(root, "exists", false));
677}
678
679pub fn write_persist_request(id: ObjectID) -> JSONResult<String> {
680 return serde_json::to_string(&json!({
681 "type": Command::PERSIST_REQUEST,
682 "id": id,
683 }));
684}
685
686pub fn read_persist_reply(message: &str) -> Result<()> {
687 let root: Value = serde_json::from_str(message)?;
688 let root = parse_json_object(&root)?;
689 check_ipc_error(&root, Command::PERSIST_REPLY)?;
690
691 return Ok(());
692}
693
694pub fn write_if_persist_request(id: ObjectID) -> JSONResult<String> {
695 return serde_json::to_string(&json!({
696 "type": Command::IF_PERSIST_REQUEST,
697 "id": id,
698 }));
699}
700
701pub fn read_if_persist_reply(message: &str) -> Result<bool> {
702 let root: Value = serde_json::from_str(message)?;
703 let root = parse_json_object(&root)?;
704 check_ipc_error(&root, Command::IF_PERSIST_REPLY)?;
705
706 return Ok(get_bool_or(root, "persist", false));
707}
708
709pub fn write_label_request(id: ObjectID, keys: &[String], values: &[String]) -> JSONResult<String> {
710 return serde_json::to_string(&json!({
711 "type": Command::LABEL_REQUEST,
712 "id": id,
713 "keys": keys,
714 "values": values,
715 }));
716}
717
718pub fn read_label_reply(message: &str) -> Result<()> {
719 let root: Value = serde_json::from_str(message)?;
720 let root = parse_json_object(&root)?;
721 check_ipc_error(&root, Command::LABEL_REPLY)?;
722
723 return Ok(());
724}
725
726pub fn write_clear_request() -> JSONResult<String> {
727 return serde_json::to_string(&json!({
728 "type": Command::CLEAR_REQUEST,
729 }));
730}
731
732pub fn read_clear_reply(message: &str) -> Result<()> {
733 let root: Value = serde_json::from_str(message)?;
734 let root = parse_json_object(&root)?;
735 check_ipc_error(&root, Command::CLEAR_REPLY)?;
736
737 return Ok(());
738}
739
740pub fn write_put_name_request(object_id: ObjectID, name: &str) -> JSONResult<String> {
741 return serde_json::to_string(&json!({
742 "type": Command::PUT_NAME_REQUEST,
743 "object_id": object_id,
744 "name": name,
745 }));
746}
747
748pub fn read_put_name_reply(message: &str) -> Result<()> {
749 let root: Value = serde_json::from_str(message)?;
750 let root = parse_json_object(&root)?;
751 check_ipc_error(&root, Command::PUT_NAME_REPLY)?;
752
753 return Ok(());
754}
755
756pub fn write_get_name_request(name: &str, wait: bool) -> JSONResult<String> {
757 return serde_json::to_string(&json!({
758 "type": Command::GET_NAME_REQUEST,
759 "name": name,
760 "wait": wait,
761 }));
762}
763
764pub fn read_get_name_reply(message: &str) -> Result<ObjectID> {
765 let root: Value = serde_json::from_str(message)?;
766 let root = parse_json_object(&root)?;
767 check_ipc_error(&root, Command::GET_NAME_REPLY)?;
768
769 return get_uint(root, "object_id");
770}
771
772pub fn write_list_name_request(pattern: &str, regex: bool, limit: usize) -> JSONResult<String> {
773 return serde_json::to_string(&json!({
774 "type": Command::LIST_NAME_REQUEST,
775 "pattern": pattern,
776 "regex": regex,
777 "limit": limit,
778 }));
779}
780
781pub fn read_list_name_reply(message: &str) -> Result<HashMap<String, ObjectID>> {
782 let root: Value = serde_json::from_str(message)?;
783 let root = parse_json_object(&root)?;
784 check_ipc_error(&root, Command::LIST_NAME_REPLY)?;
785
786 let names = parse_json_object(
787 root.get("names")
788 .ok_or(VineyardError::io_error("message does not contain names"))?,
789 )?;
790 let mut result = HashMap::new();
791 for (name, value) in names {
792 match value.as_u64() {
793 None => {}
794 Some(id) => {
795 result.insert(name.clone(), id);
796 }
797 }
798 }
799 return Ok(result);
800}
801
802pub fn write_drop_name_request(name: &str) -> JSONResult<String> {
803 return serde_json::to_string(&json!({
804 "type": Command::DROP_NAME_REQUEST,
805 "name": name,
806 }));
807}
808
809pub fn read_drop_name_reply(message: &str) -> Result<()> {
810 let root: Value = serde_json::from_str(message)?;
811 let root = parse_json_object(&root)?;
812 check_ipc_error(&root, Command::DROP_NAME_REPLY)?;
813
814 return Ok(());
815}
816
817pub fn write_evict_request(ids: &[ObjectID]) -> JSONResult<String> {
818 return serde_json::to_string(&json!({
819 "type": Command::EVICT_REQUEST,
820 "ids": ids,
821 }));
822}
823
824pub fn read_evict_reply(message: &str) -> Result<()> {
825 let root: Value = serde_json::from_str(message)?;
826 let root = parse_json_object(&root)?;
827 check_ipc_error(&root, Command::EVICT_REPLY)?;
828
829 return Ok(());
830}
831
832pub fn write_load_request(ids: &[ObjectID], pin: bool) -> JSONResult<String> {
833 return serde_json::to_string(&json!({
834 "type": Command::LOAD_REQUEST,
835 "ids": ids,
836 "pin": pin,
837 }));
838}
839
840pub fn read_load_reply(message: &str) -> Result<()> {
841 let root: Value = serde_json::from_str(message)?;
842 let root = parse_json_object(&root)?;
843 check_ipc_error(&root, Command::LOAD_REPLY)?;
844
845 return Ok(());
846}
847
848pub fn write_unpin_request(ids: &[ObjectID]) -> JSONResult<String> {
849 return serde_json::to_string(&json!({
850 "type": Command::UNPIN_REQUEST,
851 "ids": ids,
852 }));
853}
854
855pub fn read_unpin_reply(message: &str) -> Result<()> {
856 let root: Value = serde_json::from_str(message)?;
857 let root = parse_json_object(&root)?;
858 check_ipc_error(&root, Command::UNPIN_REPLY)?;
859
860 return Ok(());
861}
862
863pub fn write_is_spilled_request(id: ObjectID) -> JSONResult<String> {
864 return serde_json::to_string(&json!({
865 "type": Command::IS_SPILLED_REQUEST,
866 "id": id,
867 }));
868}
869
870pub fn read_is_spilled_reply(message: &str) -> Result<bool> {
871 let root: Value = serde_json::from_str(message)?;
872 let root = parse_json_object(&root)?;
873 check_ipc_error(&root, Command::IS_SPILLED_REPLY)?;
874
875 return Ok(get_bool_or(root, "is_spilled", false));
876}
877
878pub fn write_is_inuse_request(id: ObjectID) -> JSONResult<String> {
879 return serde_json::to_string(&json!({
880 "type": Command::IS_IN_USE_REQUEST,
881 "id": id,
882 }));
883}
884
885pub fn read_is_inuse_reply(message: &str) -> Result<bool> {
886 let root: Value = serde_json::from_str(message)?;
887 let root = parse_json_object(&root)?;
888 check_ipc_error(&root, Command::IS_IN_USE_REPLY)?;
889
890 return Ok(get_bool_or(root, "is_in_use", false));
891}
892
893pub fn write_migrate_object_request(id: ObjectID) -> JSONResult<String> {
894 return serde_json::to_string(&json!({
895 "type": Command::MIGRATE_OBJECT_REQUEST,
896 "object_id": id,
897 }));
898}
899
900pub fn read_migrate_object_reply(message: &str) -> Result<ObjectID> {
901 let root: Value = serde_json::from_str(message)?;
902 let root = parse_json_object(&root)?;
903 check_ipc_error(&root, Command::MIGRATE_OBJECT_REPLY)?;
904
905 return get_uint(root, "object_id");
906}