grpcio 0.2.1

The rust language implementation of gRPC, base on the gRPC c core library.
Documentation
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate futures;
extern crate grpcio;
extern crate grpcio_proto;
#[macro_use]
extern crate log;
extern crate rand;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;

mod util;
#[path = "../log_util.rs"]
mod log_util;

use std::sync::Arc;
use std::time::Duration;
use std::thread;

use futures::{future, Future, Sink, Stream};
use grpcio::*;
use grpcio_proto::example::route_guide::{Point, Rectangle, RouteNote};
use grpcio_proto::example::route_guide_grpc::RouteGuideClient;
use rand::Rng;

fn new_point(lat: i32, lon: i32) -> Point {
    let mut point = Point::new();
    point.set_latitude(lat);
    point.set_longitude(lon);
    point
}

fn new_rect(lat1: i32, lon1: i32, lat2: i32, lon2: i32) -> Rectangle {
    let mut rect = Rectangle::new();
    rect.set_lo(new_point(lat1, lon1));
    rect.set_hi(new_point(lat2, lon2));
    rect
}

fn new_note(lat: i32, lon: i32, msg: &str) -> RouteNote {
    let mut note = RouteNote::new();
    note.set_location(new_point(lat, lon));
    note.set_message(msg.to_owned());
    note
}

fn get_feature(client: &RouteGuideClient, point: &Point) {
    let get_feature = client.get_feature_async(point).unwrap();
    match get_feature.wait() {
        Err(e) => panic!("RPC failed: {:?}", e),
        Ok(f) => {
            if !f.has_location() {
                warn!("Server returns incomplete feature.");
                return;
            }
            if f.get_name().is_empty() {
                warn!("Found no feature at {}", util::format_point(point));
                return;
            }
            info!(
                "Found feature called {} at {}",
                f.get_name(),
                util::format_point(point)
            );
        }
    }
}

fn list_features(client: &RouteGuideClient) {
    let rect = new_rect(400_000_000, -750_000_000, 420_000_000, -730_000_000);
    info!("Looking for features between 40, -75 and 42, -73");
    let mut list_features = client.list_features(&rect).unwrap();
    loop {
        let f = list_features.into_future();
        match f.wait() {
            Ok((Some(feature), s)) => {
                list_features = s;
                let loc = feature.get_location();
                info!(
                    "Found feature {} at {}",
                    feature.get_name(),
                    util::format_point(loc)
                );
            }
            Ok((None, _)) => break,
            Err((e, _)) => panic!("List features failed: {:?}", e),
        }
    }
    info!("List feature rpc succeeded.");
}

fn record_route(client: &RouteGuideClient) {
    let features = util::load_db();
    let mut rng = rand::thread_rng();
    let (mut sink, receiver) = client.record_route().unwrap();
    for _ in 0..10 {
        let f = rng.choose(&features).unwrap();
        let point = f.get_location();
        info!("Visiting {}", util::format_point(point));
        sink = sink.send((point.to_owned(), WriteFlags::default()))
            .wait()
            .unwrap();
        thread::sleep(Duration::from_millis(rng.gen_range(500, 1500)));
    }
    // flush
    future::poll_fn(|| sink.close()).wait().unwrap();
    let sumary = receiver.wait().unwrap();
    info!("Finished trip with {} points", sumary.get_point_count());
    info!("Passed {} features", sumary.get_feature_count());
    info!("Travelled {} meters", sumary.get_distance());
    info!("It took {} seconds", sumary.get_elapsed_time());
}

fn route_chat(client: &RouteGuideClient) {
    let (mut sink, mut receiver) = client.route_chat().unwrap();
    let h = thread::spawn(move || {
        let notes = vec![
            ("First message", 0, 0),
            ("Second message", 0, 1),
            ("Third message", 1, 0),
            ("Fourth message", 0, 0),
        ];

        for (msg, lat, lon) in notes {
            let note = new_note(lat, lon, msg);
            info!("Sending message {} at {}, {}", msg, lat, lon);
            sink = sink.send((note, WriteFlags::default())).wait().unwrap();
        }
        future::poll_fn(|| sink.close()).wait().unwrap();
    });

    loop {
        match receiver.into_future().wait() {
            Ok((Some(note), r)) => {
                let location = note.get_location();
                info!(
                    "Got message {} at {}, {}",
                    note.get_message(),
                    location.get_latitude(),
                    location.get_longitude()
                );
                receiver = r;
            }
            Ok((None, _)) => break,
            Err((e, _)) => panic!("RouteChat RPC failed: {:?}", e),
        }
    }

    h.join().unwrap();
}

fn main() {
    let _guard = log_util::init_log(None);
    let env = Arc::new(Environment::new(2));
    let channel = ChannelBuilder::new(env).connect("127.0.0.1:50051");
    let client = RouteGuideClient::new(channel);

    info!("-------------- GetFeature --------------");
    get_feature(&client, &new_point(409_146_138, -746_188_906));
    get_feature(&client, &new_point(0, 0));

    info!("-------------- ListFeatures --------------");
    list_features(&client);

    info!("-------------- RecordRoute --------------");
    record_route(&client);

    info!("-------------- RouteChat --------------");
    route_chat(&client);
}