vineyard/common/util/
protocol.rs

1// Copyright 2020-2023 Alibaba Group Holding Limited.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use num_traits::ToPrimitive;
18use serde_derive::{Deserialize, Serialize};
19use serde_json::{json, Value};
20
21use super::super::memory::Payload;
22use super::json::*;
23use super::status::*;
24use super::uuid::*;
25
26#[non_exhaustive]
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct Command;
29
30#[allow(dead_code)]
31impl Command {
32    pub const REGISTER_REQUEST: &'static str = "register_request";
33    pub const REGISTER_REPLY: &'static str = "register_reply";
34    pub const EXIT_REQUEST: &'static str = "exit_request";
35    pub const EXIT_REPLY: &'static str = "exit_reply";
36
37    // Blobs APIs
38    pub const CREATE_BUFFER_REQUEST: &'static str = "create_buffer_request";
39    pub const CREATE_BUFFER_REPLY: &'static str = "create_buffer_reply";
40    pub const CREATE_DISK_BUFFER_REQUEST: &'static str = "create_disk_buffer_request";
41    pub const CREATE_DISK_BUFFER_REPLY: &'static str = "create_disk_buffer_reply";
42    pub const CREATE_GPU_BUFFER_REQUEST: &'static str = "create_gpu_buffer_request";
43    pub const CREATE_GPU_BUFFER_REPLY: &'static str = "create_gpu_buffer_reply";
44    pub const SEAL_BUFFER_REQUEST: &'static str = "seal_request";
45    pub const SEAL_BUFFER_REPLY: &'static str = "seal_reply";
46    pub const GET_BUFFERS_REQUEST: &'static str = "get_buffers_request";
47    pub const GET_BUFFERS_REPLY: &'static str = "get_buffers_reply";
48    pub const GET_GPU_BUFFERS_REQUEST: &'static str = "get_gpu_buffers_request";
49    pub const GET_GPU_BUFFERS_REPLY: &'static str = "get_gpu_buffers_reply";
50    pub const DROP_BUFFER_REQUEST: &'static str = "drop_buffer_request";
51    pub const DROP_BUFFER_REPLY: &'static str = "drop_buffer_reply";
52
53    pub const REQUEST_FD_REQUEST: &'static str = "request_fd_request";
54    pub const REQUEST_FD_REPLY: &'static str = "request_fd_reply";
55
56    pub const CREATE_REMOTE_BUFFER_REQUEST: &'static str = "create_remote_buffer_request";
57    pub const GET_REMOTE_BUFFERS_REQUEST: &'static str = "get_remote_buffers_request";
58
59    pub const INCREASE_REFERENCE_COUNT_REQUEST: &'static str = "increase_reference_count_request";
60    pub const INCREASE_REFERENCE_COUNT_REPLY: &'static str = "increase_reference_count_reply";
61    pub const RELEASE_REQUEST: &'static str = "release_request";
62    pub const RELEASE_REPLY: &'static str = "release_reply";
63    pub const DEL_DATA_WITH_FEEDBACKS_REQUEST: &'static str = "del_data_with_feedbacks_request";
64    pub const DEL_DATA_WITH_FEEDBACKS_REPLY: &'static str = "del_data_with_feedbacks_reply";
65
66    pub const CREATE_BUFFER_PLASMA_REQUEST: &'static str = "create_buffer_by_plasma_request";
67    pub const CREATE_BUFFER_PLASMA_REPLY: &'static str = "create_buffer_by_plasma_reply";
68    pub const GET_BUFFERS_PLASMA_REQUEST: &'static str = "get_buffers_by_plasma_request";
69    pub const GET_BUFFERS_PLASMA_REPLY: &'static str = "get_buffers_by_plasma_reply";
70    pub const PLASMA_SEAL_REQUEST: &'static str = "plasma_seal_request";
71    pub const PLASMA_SEAL_REPLY: &'static str = "plasma_seal_reply";
72    pub const PLASMA_RELEASE_REQUEST: &'static str = "plasma_release_request";
73    pub const PLASMA_RELEASE_REPLY: &'static str = "plasma_release_reply";
74    pub const PLASMA_DEL_DATA_REQUEST: &'static str = "plasma_delete_data_request";
75    pub const PLASMA_DEL_DATA_REPLY: &'static str = "plasma_delete_data_reply";
76
77    // Metadata APIs
78    pub const CREATE_DATA_REQUEST: &'static str = "create_data_request";
79    pub const CREATE_DATA_REPLY: &'static str = "create_data_reply";
80    pub const GET_DATA_REQUEST: &'static str = "get_data_request";
81    pub const GET_DATA_REPLY: &'static str = "get_data_reply";
82    pub const LIST_DATA_REQUEST: &'static str = "list_data_request";
83    pub const LIST_DATA_REPLY: &'static str = "list_data_reply";
84    pub const DELETE_DATA_REQUEST: &'static str = "del_data_request";
85    pub const DELETE_DATA_REPLY: &'static str = "del_data_reply";
86    pub const EXISTS_REQUEST: &'static str = "exists_request";
87    pub const EXISTS_REPLY: &'static str = "exists_reply";
88    pub const PERSIST_REQUEST: &'static str = "persist_request";
89    pub const PERSIST_REPLY: &'static str = "persist_reply";
90    pub const IF_PERSIST_REQUEST: &'static str = "if_persist_request";
91    pub const IF_PERSIST_REPLY: &'static str = "if_persist_reply";
92    pub const LABEL_REQUEST: &'static str = "label_request";
93    pub const LABEL_REPLY: &'static str = "label_reply";
94    pub const CLEAR_REQUEST: &'static str = "clear_request";
95    pub const CLEAR_REPLY: &'static str = "clear_reply";
96
97    // Stream APIs
98    pub const CREATE_STREAM_REQUEST: &'static str = "create_stream_request";
99    pub const CREATE_STREAM_REPLY: &'static str = "create_stream_reply";
100    pub const OPEN_STREAM_REQUEST: &'static str = "open_stream_request";
101    pub const OPEN_STREAM_REPLY: &'static str = "open_stream_reply";
102    pub const GET_NEXT_STREAM_CHUNK_REQUEST: &'static str = "get_next_stream_chunk_request";
103    pub const GET_NEXT_STREAM_CHUNK_REPLY: &'static str = "get_next_stream_chunk_reply";
104    pub const PUSH_NEXT_STREAM_CHUNK_REQUEST: &'static str = "push_next_stream_chunk_request";
105    pub const PUSH_NEXT_STREAM_CHUNK_REPLY: &'static str = "push_next_stream_chunk_reply";
106    pub const PULL_NEXT_STREAM_CHUNK_REQUEST: &'static str = "pull_next_stream_chunk_request";
107    pub const PULL_NEXT_STREAM_CHUNK_REPLY: &'static str = "pull_next_stream_chunk_reply";
108    pub const STOP_STREAM_REQUEST: &'static str = "stop_stream_request";
109    pub const STOP_STREAM_REPLY: &'static str = "stop_stream_reply";
110    pub const DROP_STREAM_REQUEST: &'static str = "drop_stream_request";
111    pub const DROP_STREAM_REPLY: &'static str = "drop_stream_reply";
112
113    // Names APIs
114    pub const PUT_NAME_REQUEST: &'static str = "put_name_request";
115    pub const PUT_NAME_REPLY: &'static str = "put_name_reply";
116    pub const GET_NAME_REQUEST: &'static str = "get_name_request";
117    pub const GET_NAME_REPLY: &'static str = "get_name_reply";
118    pub const LIST_NAME_REQUEST: &'static str = "list_name_request";
119    pub const LIST_NAME_REPLY: &'static str = "list_name_reply";
120    pub const DROP_NAME_REQUEST: &'static str = "drop_name_request";
121    pub const DROP_NAME_REPLY: &'static str = "drop_name_reply";
122
123    // Arena APIs
124    pub const MAKE_ARENA_REQUEST: &'static str = "make_arena_request";
125    pub const MAKE_ARENA_REPLY: &'static str = "make_arena_reply";
126    pub const FINALIZE_ARENA_REQUEST: &'static str = "finalize_arena_request";
127    pub const FINALIZE_ARENA_REPLY: &'static str = "finalize_arena_reply";
128
129    // Session APIs
130    pub const NEW_SESSION_REQUEST: &'static str = "new_session_request";
131    pub const NEW_SESSION_REPLY: &'static str = "new_session_reply";
132    pub const DELETE_SESSION_REQUEST: &'static str = "delete_session_request";
133    pub const DELETE_SESSION_REPLY: &'static str = "delete_session_reply";
134
135    pub const MOVE_BUFFERS_OWNERSHIP_REQUEST: &'static str = "move_buffers_ownership_request";
136    pub const MOVE_BUFFERS_OWNERSHIP_REPLY: &'static str = "move_buffers_ownership_reply";
137
138    // Spill APIs
139    pub const EVICT_REQUEST: &'static str = "evict_request";
140    pub const EVICT_REPLY: &'static str = "evict_reply";
141    pub const LOAD_REQUEST: &'static str = "load_request";
142    pub const LOAD_REPLY: &'static str = "load_reply";
143    pub const UNPIN_REQUEST: &'static str = "unpin_request";
144    pub const UNPIN_REPLY: &'static str = "unpin_reply";
145    pub const IS_SPILLED_REQUEST: &'static str = "is_spilled_request";
146    pub const IS_SPILLED_REPLY: &'static str = "is_spilled_reply";
147    pub const IS_IN_USE_REQUEST: &'static str = "is_in_use_request";
148    pub const IS_IN_USE_REPLY: &'static str = "is_in_use_reply";
149
150    // Meta APIs
151    pub const CLUSTER_META_REQUEST: &'static str = "cluster_meta";
152    pub const CLUSTER_META_REPLY: &'static str = "cluster_meta";
153    pub const INSTANCE_STATUS_REQUEST: &'static str = "instance_status_request";
154    pub const INSTANCE_STATUS_REPLY: &'static str = "instance_status_reply";
155    pub const MIGRATE_OBJECT_REQUEST: &'static str = "migrate_object_request";
156    pub const MIGRATE_OBJECT_REPLY: &'static str = "migrate_object_reply";
157    pub const SHALLOW_COPY_REQUEST: &'static str = "shallow_copy_request";
158    pub const SHALLOW_COPY_REPLY: &'static str = "shallow_copy_reply";
159    pub const DEBUG_REQUEST: &'static str = "debug_command";
160    pub const DEBUG_REPLY: &'static str = "debug_reply";
161}
162
163fn check_ipc_error<'a>(root: &'a JSON, reply_type: &str) -> Result<()> {
164    if root.contains_key("code") {
165        let code = root["code"].as_u64().unwrap_or(0);
166        if code != 0 {
167            let mut error_message: String = "unable to find error message in the response".into();
168            if let Some(message) = root.get("message") {
169                if let Some(message) = message.as_str() {
170                    error_message = message.into();
171                }
172            }
173            return Err(VineyardError::new(
174                unsafe { std::mem::transmute(code as u8) },
175                error_message,
176            ));
177        }
178    }
179    if let Some(message_type) = root.get("type") {
180        return vineyard_assert(
181            message_type.as_str().map_or(false, |t| t == reply_type),
182            format!("unexpected reply type: '{}'", message_type),
183        );
184    } else {
185        return vineyard_assert(false, "no 'type' field in the response");
186    }
187}
188
189#[derive(Debug, Default)]
190pub struct RegisterRequest {
191    pub version: String,
192    pub store_type: String,
193    pub session_id: i64,
194    pub username: String,
195    pub password: String,
196    pub support_rpc_compression: bool,
197}
198
199pub fn write_register_request(r: RegisterRequest) -> JSONResult<String> {
200    return serde_json::to_string(&json!({
201        "type": Command::REGISTER_REQUEST,
202        "version": r.version,
203        "store_type": r.store_type,
204        "session_id": r.session_id,
205        "username": r.username,
206        "password": r.password,
207        "support_rpc_compression": r.support_rpc_compression,
208    }));
209}
210
211#[derive(Debug, Default)]
212pub struct RegisterReply {
213    pub ipc_socket: String,
214    pub rpc_endpoint: String,
215    pub instance_id: InstanceID,
216    pub version: String,
217    pub support_rpc_compression: bool,
218}
219
220pub fn read_register_reply(message: &str) -> Result<RegisterReply> {
221    let root: Value = serde_json::from_str(message)?;
222    let root = parse_json_object(&root)?;
223    check_ipc_error(&root, Command::REGISTER_REPLY)?;
224
225    return Ok(RegisterReply {
226        ipc_socket: get_string(root, "ipc_socket")?.into(),
227        rpc_endpoint: get_string(root, "rpc_endpoint")?.into(),
228        instance_id: get_uint(root, "instance_id")?,
229        version: get_string(root, "version")?.into(),
230        support_rpc_compression: get_bool_or(root, "support_rpc_compression", false),
231    });
232}
233
234pub fn write_exit_request() -> JSONResult<String> {
235    return serde_json::to_string(&json!({
236        "type": Command::EXIT_REQUEST,
237    }));
238}
239
240#[derive(Debug, Default)]
241pub struct CreateBufferReply {
242    pub id: ObjectID,
243    pub payload: Payload,
244    pub fd: i32,
245}
246
247pub fn write_create_buffer_request(size: usize) -> JSONResult<String> {
248    return serde_json::to_string(&json!({
249        "type": Command::CREATE_BUFFER_REQUEST,
250        "size": size,
251    }));
252}
253
254pub fn read_create_buffer_reply(message: &str) -> Result<CreateBufferReply> {
255    let root: Value = serde_json::from_str(message)?;
256    let root = parse_json_object(&root)?;
257    check_ipc_error(&root, Command::CREATE_BUFFER_REPLY)?;
258
259    let created = parse_json_object(&root["created"])?;
260    let payload = Payload::from_json(&created)?;
261
262    return Ok(CreateBufferReply {
263        id: get_uint(root, "id")?,
264        payload: payload,
265        fd: get_int32::<i32>(root, "fd")?,
266    });
267}
268
269pub fn write_create_disk_buffer_request(size: usize, path: &str) -> JSONResult<String> {
270    return serde_json::to_string(&json!({
271        "type": Command::CREATE_DISK_BUFFER_REQUEST,
272        "size": size,
273        "path": path,
274    }));
275}
276
277pub fn read_create_disk_buffer_reply(message: &str) -> Result<CreateBufferReply> {
278    let root: Value = serde_json::from_str(message)?;
279    let root = parse_json_object(&root)?;
280    check_ipc_error(&root, Command::CREATE_DISK_BUFFER_REPLY)?;
281
282    let created = parse_json_object(&root["created"])?;
283    let payload = Payload::from_json(&created)?;
284
285    return Ok(CreateBufferReply {
286        id: get_uint(root, "id")?,
287        payload: payload,
288        fd: get_int::<i64>(root, "fd")?
289            .to_i32()
290            .ok_or(VineyardError::io_error(
291                "fd received from server must be a 32-bit integer",
292            ))?,
293    });
294}
295
296#[derive(Debug, Default)]
297pub struct CreateGPUBufferReply {
298    pub id: ObjectID,
299    pub payload: Payload,
300    pub handle: Vec<i64>,
301}
302
303pub fn write_create_gpu_buffer_request(size: usize) -> JSONResult<String> {
304    return serde_json::to_string(&json!({
305        "type": Command::CREATE_GPU_BUFFER_REQUEST,
306        "size": size,
307    }));
308}
309
310pub fn read_create_gpu_buffer_reply(message: &str) -> Result<CreateGPUBufferReply> {
311    let root: Value = serde_json::from_str(message)?;
312    let root = parse_json_object(&root)?;
313    check_ipc_error(&root, Command::CREATE_GPU_BUFFER_REPLY)?;
314
315    let created = parse_json_object(&root["created"])?;
316    let payload = Payload::from_json(&created)?;
317
318    let handle = root["handle"]
319        .as_array()
320        .ok_or(VineyardError::io_error("handle is not an array"))?
321        .iter()
322        .map(|v| {
323            v.as_i64()
324                .ok_or(VineyardError::io_error("handle is not an integer"))
325        })
326        .collect::<Result<Vec<i64>>>()?;
327
328    return Ok(CreateGPUBufferReply {
329        id: get_uint(root, "id")?,
330        payload: payload,
331        handle: handle,
332    });
333}
334
335pub fn write_seal_request(id: ObjectID) -> JSONResult<String> {
336    return serde_json::to_string(&json!({
337        "type": Command::SEAL_BUFFER_REQUEST,
338        "object_id": id,
339    }));
340}
341
342pub fn read_seal_reply(message: &str) -> Result<()> {
343    let root: Value = serde_json::from_str(message)?;
344    let root = parse_json_object(&root)?;
345    check_ipc_error(&root, Command::SEAL_BUFFER_REPLY)?;
346
347    return Ok(());
348}
349
350#[derive(Debug, Default)]
351pub struct GetBuffersReply {
352    pub payloads: Vec<Payload>,
353    pub fds: Vec<i32>,
354    pub compress: bool,
355}
356
357pub fn write_get_buffers_request(ids: &[ObjectID], unsafe_: bool) -> JSONResult<String> {
358    return serde_json::to_string(&json!({
359        "type": Command::GET_BUFFERS_REQUEST,
360        "ids": ids,
361        "unsafe": unsafe_,
362    }));
363}
364
365pub fn read_get_buffers_reply(message: &str) -> Result<GetBuffersReply> {
366    let root: Value = serde_json::from_str(message)?;
367    let root = parse_json_object(&root)?;
368    check_ipc_error(&root, Command::GET_BUFFERS_REPLY)?;
369
370    let mut reply = GetBuffersReply::default();
371
372    if let Some(Value::Array(ref payloads)) = root.get("payloads") {
373        for payload in payloads {
374            reply
375                .payloads
376                .push(Payload::from_json(payload.as_object().ok_or(
377                    VineyardError::io_error(
378                        "invalid get_buffers reply: payload in message is not a JSON object",
379                    ),
380                )?)?);
381        }
382    } else {
383        let num: i64 = get_int(root, "num")?;
384        for i in 0..num {
385            match root[&i.to_string()] {
386                Value::Object(ref payload) => {
387                    reply.payloads.push(Payload::from_json(payload)?);
388                }
389                _ => {
390                    return Err(VineyardError::io_error(
391                        "invalid get_buffers reply: payload in message is not a JSON object",
392                    ));
393                }
394            }
395        }
396    }
397
398    if let Some(Value::Array(ref fds)) = root.get("fds") {
399        for fd in fds {
400            reply.fds.push(
401                fd.as_i64()
402                    .ok_or(VineyardError::io_error("fd is not an integer"))?
403                    .to_i32()
404                    .ok_or(VineyardError::io_error(
405                        "fd received from server must be a 32-bit integer",
406                    ))?,
407            );
408        }
409    }
410    return Ok(reply);
411}
412
413pub fn write_drop_buffer_request(id: ObjectID) -> JSONResult<String> {
414    return serde_json::to_string(&json!({
415        "type": Command::DROP_BUFFER_REQUEST,
416        "id": id,
417    }));
418}
419
420pub fn read_drop_buffer_reply(message: &str) -> Result<()> {
421    let root: Value = serde_json::from_str(message)?;
422    let root = parse_json_object(&root)?;
423    check_ipc_error(&root, Command::DROP_BUFFER_REPLY)?;
424
425    return Ok(());
426}
427
428pub fn write_create_remote_buffer_request(size: usize, compress: bool) -> JSONResult<String> {
429    return serde_json::to_string(&json!({
430        "type": Command::CREATE_REMOTE_BUFFER_REQUEST,
431        "size": size,
432        "compress": compress,
433    }));
434}
435
436pub fn read_create_remote_buffer_reply(message: &str) -> Result<CreateBufferReply> {
437    return read_create_buffer_reply(message);
438}
439
440pub fn write_get_remote_buffers_request(ids: &[ObjectID]) -> JSONResult<String> {
441    return serde_json::to_string(&json!({
442        "type": Command::GET_REMOTE_BUFFERS_REQUEST,
443        "ids": ids,
444    }));
445}
446
447pub fn read_get_remote_buffers_reply(message: &str) -> Result<GetBuffersReply> {
448    return read_get_buffers_reply(message);
449}
450
451pub fn write_increase_reference_count_request(id: &[ObjectID]) -> JSONResult<String> {
452    return serde_json::to_string(&json!({
453        "type": Command::INCREASE_REFERENCE_COUNT_REQUEST,
454        "ids": id,
455    }));
456}
457
458pub fn read_increase_reference_count_reply(message: &str) -> Result<()> {
459    let root: Value = serde_json::from_str(message)?;
460    let root = parse_json_object(&root)?;
461    check_ipc_error(&root, Command::INCREASE_REFERENCE_COUNT_REPLY)?;
462
463    return Ok(());
464}
465
466pub fn write_release_request(id: ObjectID) -> JSONResult<String> {
467    return serde_json::to_string(&json!({
468        "type": Command::RELEASE_REQUEST,
469        "object_id": id,
470    }));
471}
472
473pub fn read_release_reply(message: &str) -> Result<()> {
474    let root: Value = serde_json::from_str(message)?;
475    let root = parse_json_object(&root)?;
476    check_ipc_error(&root, Command::RELEASE_REPLY)?;
477
478    return Ok(());
479}
480
481#[derive(Debug, Default)]
482pub struct CreateDataReply {
483    pub id: ObjectID,
484    pub signature: Signature,
485    pub instance_id: InstanceID,
486}
487
488pub fn write_create_data_request(content: &JSON) -> JSONResult<String> {
489    return serde_json::to_string(&json!({
490        "type": Command::CREATE_DATA_REQUEST,
491        "content": content,
492    }));
493}
494
495pub fn read_create_data_reply(message: &str) -> Result<CreateDataReply> {
496    let root: Value = serde_json::from_str(message)?;
497    let root = parse_json_object(&root)?;
498    check_ipc_error(&root, "create_data_reply")?;
499
500    return Ok(CreateDataReply {
501        id: get_uint(root, "id")?,
502        signature: get_uint(root, "signature")?,
503        instance_id: get_uint(root, "instance_id")?,
504    });
505}
506
507pub fn write_get_data_request(id: ObjectID, sync_remote: bool, wait: bool) -> JSONResult<String> {
508    return serde_json::to_string(&json!({
509        "type": Command::GET_DATA_REQUEST,
510        "id": vec![id],
511        "sync_remote": sync_remote,
512        "wait": wait,
513    }));
514}
515
516pub fn read_get_data_reply(message: &str) -> Result<JSON> {
517    let root: Value = serde_json::from_str(message)?;
518    let root = parse_json_object(&root)?;
519    check_ipc_error(&root, "get_data_reply")?;
520
521    match root["content"] {
522        Value::Array(ref content) => {
523            if content.len() != 1 {
524                return Err(VineyardError::io_error(
525                    "failed to read get_data reply: content array's length is not 1",
526                ));
527            }
528            return Ok(parse_json_object(&content[0])?.clone());
529        }
530        Value::Object(ref content) => match content.iter().next() {
531            None => {
532                return Err(VineyardError::io_error(
533                    "failed to read get_data reply: content dict's length is not 1",
534                ));
535            }
536            Some((_, meta)) => {
537                return Ok(parse_json_object(meta)?.clone());
538            }
539        },
540        _ => {
541            return Err(VineyardError::io_error(
542                "failed to read get_data reply: content is not an array or a dict",
543            ));
544        }
545    }
546}
547
548pub fn write_get_data_batch_request(
549    ids: &[ObjectID],
550    sync_remote: bool,
551    wait: bool,
552) -> JSONResult<String> {
553    return serde_json::to_string(&json!({
554        "type": Command::GET_DATA_REQUEST,
555        "id": ids,
556        "sync_remote": sync_remote,
557        "wait": wait,
558    }));
559}
560
561pub fn read_get_data_batch_reply(message: &str) -> Result<HashMap<ObjectID, JSON>> {
562    let root: Value = serde_json::from_str(message)?;
563    let root = parse_json_object(&root)?;
564    check_ipc_error(&root, "get_data_reply")?;
565
566    match root["content"] {
567        Value::Array(ref content) => {
568            let mut data = HashMap::new();
569            for item in content {
570                let object = parse_json_object(&item)?;
571                data.insert(
572                    object_id_from_string(get_string(object, "id")?)?,
573                    object.clone(),
574                );
575            }
576            return Ok(data);
577        }
578        Value::Object(ref content) => {
579            let mut data = HashMap::new();
580            for (id, object) in content.iter() {
581                data.insert(
582                    object_id_from_string(id)?,
583                    parse_json_object(object)?.clone(),
584                );
585            }
586            return Ok(data);
587        }
588        _ => {
589            return Err(VineyardError::io_error(
590                "failed to read get_data reply: content is not an array or a dict",
591            ));
592        }
593    }
594}
595
596pub fn write_list_data_request(pattern: &str, regex: bool, limit: usize) -> JSONResult<String> {
597    return serde_json::to_string(&json!({
598        "type": Command::LIST_DATA_REQUEST,
599        "pattern": pattern,
600        "regex": regex,
601        "limit": limit,
602    }));
603}
604
605pub fn read_list_data_reply(message: &str) -> Result<Vec<JSON>> {
606    let root: Value = serde_json::from_str(message)?;
607    let root = parse_json_object(&root)?;
608    check_ipc_error(&root, "list_data_reply")?;
609
610    let mut reply = Vec::new();
611    match root["content"] {
612        Value::Array(ref data) => {
613            for item in data {
614                reply.push(parse_json_object(&item)?.clone());
615            }
616            return Ok(reply);
617        }
618        _ => {
619            return Err(VineyardError::io_error(
620                "failed to read list_data reply: data is not an array",
621            ));
622        }
623    }
624}
625
626pub fn write_delete_data_request(
627    id: ObjectID,
628    force: bool,
629    deep: bool,
630    fastpath: bool,
631) -> JSONResult<String> {
632    return serde_json::to_string(&json!({
633        "type": Command::DELETE_DATA_REQUEST,
634        "id": vec![id],
635        "force": force,
636        "deep:": deep,
637        "fastpath": fastpath,
638    }));
639}
640
641pub fn write_delete_data_batch_request(
642    ids: &[ObjectID],
643    force: bool,
644    deep: bool,
645    fastpath: bool,
646) -> JSONResult<String> {
647    return serde_json::to_string(&json!({
648        "type": Command::DELETE_DATA_REQUEST,
649        "id": ids,
650        "force": force,
651        "deep:": deep,
652        "fastpath": fastpath,
653    }));
654}
655
656pub fn read_delete_data_reply(message: &str) -> Result<()> {
657    let root: Value = serde_json::from_str(message)?;
658    let root = parse_json_object(&root)?;
659    check_ipc_error(&root, Command::DELETE_DATA_REPLY)?;
660
661    return Ok(());
662}
663
664pub fn write_exists_request(id: ObjectID) -> JSONResult<String> {
665    return serde_json::to_string(&json!({
666        "type": Command::EXISTS_REQUEST,
667        "id": id,
668    }));
669}
670
671pub fn read_exists_reply(message: &str) -> Result<bool> {
672    let root: Value = serde_json::from_str(message)?;
673    let root = parse_json_object(&root)?;
674    check_ipc_error(&root, Command::EXISTS_REPLY)?;
675
676    return Ok(get_bool_or(root, "exists", false));
677}
678
679pub fn write_persist_request(id: ObjectID) -> JSONResult<String> {
680    return serde_json::to_string(&json!({
681        "type": Command::PERSIST_REQUEST,
682        "id": id,
683    }));
684}
685
686pub fn read_persist_reply(message: &str) -> Result<()> {
687    let root: Value = serde_json::from_str(message)?;
688    let root = parse_json_object(&root)?;
689    check_ipc_error(&root, Command::PERSIST_REPLY)?;
690
691    return Ok(());
692}
693
694pub fn write_if_persist_request(id: ObjectID) -> JSONResult<String> {
695    return serde_json::to_string(&json!({
696        "type": Command::IF_PERSIST_REQUEST,
697        "id": id,
698    }));
699}
700
701pub fn read_if_persist_reply(message: &str) -> Result<bool> {
702    let root: Value = serde_json::from_str(message)?;
703    let root = parse_json_object(&root)?;
704    check_ipc_error(&root, Command::IF_PERSIST_REPLY)?;
705
706    return Ok(get_bool_or(root, "persist", false));
707}
708
709pub fn write_label_request(id: ObjectID, keys: &[String], values: &[String]) -> JSONResult<String> {
710    return serde_json::to_string(&json!({
711        "type": Command::LABEL_REQUEST,
712        "id": id,
713        "keys": keys,
714        "values": values,
715    }));
716}
717
718pub fn read_label_reply(message: &str) -> Result<()> {
719    let root: Value = serde_json::from_str(message)?;
720    let root = parse_json_object(&root)?;
721    check_ipc_error(&root, Command::LABEL_REPLY)?;
722
723    return Ok(());
724}
725
726pub fn write_clear_request() -> JSONResult<String> {
727    return serde_json::to_string(&json!({
728        "type": Command::CLEAR_REQUEST,
729    }));
730}
731
732pub fn read_clear_reply(message: &str) -> Result<()> {
733    let root: Value = serde_json::from_str(message)?;
734    let root = parse_json_object(&root)?;
735    check_ipc_error(&root, Command::CLEAR_REPLY)?;
736
737    return Ok(());
738}
739
740pub fn write_put_name_request(object_id: ObjectID, name: &str) -> JSONResult<String> {
741    return serde_json::to_string(&json!({
742        "type": Command::PUT_NAME_REQUEST,
743        "object_id": object_id,
744        "name": name,
745    }));
746}
747
748pub fn read_put_name_reply(message: &str) -> Result<()> {
749    let root: Value = serde_json::from_str(message)?;
750    let root = parse_json_object(&root)?;
751    check_ipc_error(&root, Command::PUT_NAME_REPLY)?;
752
753    return Ok(());
754}
755
756pub fn write_get_name_request(name: &str, wait: bool) -> JSONResult<String> {
757    return serde_json::to_string(&json!({
758        "type": Command::GET_NAME_REQUEST,
759        "name": name,
760        "wait": wait,
761    }));
762}
763
764pub fn read_get_name_reply(message: &str) -> Result<ObjectID> {
765    let root: Value = serde_json::from_str(message)?;
766    let root = parse_json_object(&root)?;
767    check_ipc_error(&root, Command::GET_NAME_REPLY)?;
768
769    return get_uint(root, "object_id");
770}
771
772pub fn write_list_name_request(pattern: &str, regex: bool, limit: usize) -> JSONResult<String> {
773    return serde_json::to_string(&json!({
774        "type": Command::LIST_NAME_REQUEST,
775        "pattern": pattern,
776        "regex": regex,
777        "limit": limit,
778    }));
779}
780
781pub fn read_list_name_reply(message: &str) -> Result<HashMap<String, ObjectID>> {
782    let root: Value = serde_json::from_str(message)?;
783    let root = parse_json_object(&root)?;
784    check_ipc_error(&root, Command::LIST_NAME_REPLY)?;
785
786    let names = parse_json_object(
787        root.get("names")
788            .ok_or(VineyardError::io_error("message does not contain names"))?,
789    )?;
790    let mut result = HashMap::new();
791    for (name, value) in names {
792        match value.as_u64() {
793            None => {}
794            Some(id) => {
795                result.insert(name.clone(), id);
796            }
797        }
798    }
799    return Ok(result);
800}
801
802pub fn write_drop_name_request(name: &str) -> JSONResult<String> {
803    return serde_json::to_string(&json!({
804        "type": Command::DROP_NAME_REQUEST,
805        "name": name,
806    }));
807}
808
809pub fn read_drop_name_reply(message: &str) -> Result<()> {
810    let root: Value = serde_json::from_str(message)?;
811    let root = parse_json_object(&root)?;
812    check_ipc_error(&root, Command::DROP_NAME_REPLY)?;
813
814    return Ok(());
815}
816
817pub fn write_evict_request(ids: &[ObjectID]) -> JSONResult<String> {
818    return serde_json::to_string(&json!({
819        "type": Command::EVICT_REQUEST,
820        "ids": ids,
821    }));
822}
823
824pub fn read_evict_reply(message: &str) -> Result<()> {
825    let root: Value = serde_json::from_str(message)?;
826    let root = parse_json_object(&root)?;
827    check_ipc_error(&root, Command::EVICT_REPLY)?;
828
829    return Ok(());
830}
831
832pub fn write_load_request(ids: &[ObjectID], pin: bool) -> JSONResult<String> {
833    return serde_json::to_string(&json!({
834        "type": Command::LOAD_REQUEST,
835        "ids": ids,
836        "pin": pin,
837    }));
838}
839
840pub fn read_load_reply(message: &str) -> Result<()> {
841    let root: Value = serde_json::from_str(message)?;
842    let root = parse_json_object(&root)?;
843    check_ipc_error(&root, Command::LOAD_REPLY)?;
844
845    return Ok(());
846}
847
848pub fn write_unpin_request(ids: &[ObjectID]) -> JSONResult<String> {
849    return serde_json::to_string(&json!({
850        "type": Command::UNPIN_REQUEST,
851        "ids": ids,
852    }));
853}
854
855pub fn read_unpin_reply(message: &str) -> Result<()> {
856    let root: Value = serde_json::from_str(message)?;
857    let root = parse_json_object(&root)?;
858    check_ipc_error(&root, Command::UNPIN_REPLY)?;
859
860    return Ok(());
861}
862
863pub fn write_is_spilled_request(id: ObjectID) -> JSONResult<String> {
864    return serde_json::to_string(&json!({
865        "type": Command::IS_SPILLED_REQUEST,
866        "id": id,
867    }));
868}
869
870pub fn read_is_spilled_reply(message: &str) -> Result<bool> {
871    let root: Value = serde_json::from_str(message)?;
872    let root = parse_json_object(&root)?;
873    check_ipc_error(&root, Command::IS_SPILLED_REPLY)?;
874
875    return Ok(get_bool_or(root, "is_spilled", false));
876}
877
878pub fn write_is_inuse_request(id: ObjectID) -> JSONResult<String> {
879    return serde_json::to_string(&json!({
880        "type": Command::IS_IN_USE_REQUEST,
881        "id": id,
882    }));
883}
884
885pub fn read_is_inuse_reply(message: &str) -> Result<bool> {
886    let root: Value = serde_json::from_str(message)?;
887    let root = parse_json_object(&root)?;
888    check_ipc_error(&root, Command::IS_IN_USE_REPLY)?;
889
890    return Ok(get_bool_or(root, "is_in_use", false));
891}
892
893pub fn write_migrate_object_request(id: ObjectID) -> JSONResult<String> {
894    return serde_json::to_string(&json!({
895        "type": Command::MIGRATE_OBJECT_REQUEST,
896        "object_id": id,
897    }));
898}
899
900pub fn read_migrate_object_reply(message: &str) -> Result<ObjectID> {
901    let root: Value = serde_json::from_str(message)?;
902    let root = parse_json_object(&root)?;
903    check_ipc_error(&root, Command::MIGRATE_OBJECT_REPLY)?;
904
905    return get_uint(root, "object_id");
906}