1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2021 The Engula Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//! A [`Kernel`] implementation that interacts with gRPC kernel service.
//!
//! [`Kernel`]: crate::Kernel

mod client;
mod compose;
mod error;
mod kernel;
mod proto;
mod server;

pub use self::{
    client::Client,
    compose::{FileKernel, Kernel as ComposeKernel, MemKernel},
    kernel::Kernel,
    server::Server,
};

#[cfg(test)]
mod tests {
    use futures::TryStreamExt;
    use tokio::net::TcpListener;
    use tokio_stream::wrappers::TcpListenerStream;

    use super::{MemKernel, Server};
    use crate::*;

    async fn mock_journal_and_storage_server(
    ) -> std::result::Result<String, Box<dyn std::error::Error>> {
        let listener = TcpListener::bind("127.0.0.1:0").await?;
        let local_addr = listener.local_addr()?;
        tokio::task::spawn(async move {
            let journal = mem::Journal::default();
            let storage = mem::Storage::default();
            tonic::transport::Server::builder()
                .add_service(engula_journal::grpc::Server::new(journal).into_service())
                .add_service(engula_storage::grpc::Server::new(storage).into_service())
                .serve_with_incoming(TcpListenerStream::new(listener))
                .await
                .unwrap();
        });

        Ok(local_addr.to_string())
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn test() -> std::result::Result<(), Box<dyn std::error::Error>> {
        let address = mock_journal_and_storage_server().await?;
        let listener = TcpListener::bind("127.0.0.1:0").await?;
        let local_addr = listener.local_addr()?;
        tokio::task::spawn(async move {
            let kernel = MemKernel::open(&address, &address).await.unwrap();
            let server = Server::new(&address, &address, kernel);
            tonic::transport::Server::builder()
                .add_service(server.into_service())
                .serve_with_incoming(TcpListenerStream::new(listener))
                .await
                .unwrap();
        });

        let kernel = grpc::Kernel::connect(&local_addr.to_string()).await?;
        let version = kernel.current_version().await?;
        assert_eq!(version.sequence, 0);
        assert_eq!(version.meta.len(), 0);
        assert_eq!(version.objects.len(), 0);

        let handle = {
            let mut expect = VersionUpdate {
                sequence: 1,
                ..Default::default()
            };
            expect.add_meta.insert("a".to_owned(), b"b".to_vec());
            expect.remove_meta.push("b".to_owned());
            expect.add_objects.push("a".to_owned());
            expect.remove_objects.push("b".to_owned());
            let mut version_updates = kernel.version_updates(0).await;
            tokio::spawn(async move {
                let update = version_updates.try_next().await.unwrap().unwrap();
                assert_eq!(*update, expect);
            })
        };

        let mut update = KernelUpdate::default();
        update.add_meta("a", "b");
        update.remove_meta("b");
        update.add_object("a");
        update.remove_object("b");
        kernel.apply_update(update).await?;

        handle.await.unwrap();

        let new_version = kernel.current_version().await?;
        assert_eq!(new_version.sequence, 1);
        assert_eq!(new_version.meta.len(), 1);
        assert_eq!(new_version.objects.len(), 1);

        Ok(())
    }
}