actix-elastic 1.0.7

elasticsearch 7.x actix actor
use serde::{Deserialize, Serialize};

use crate::response::common::Shards;

use std::fmt;

#[derive(Debug, Serialize, Deserialize)]
pub struct BulkResponse {
    pub took: i32,
    pub errors: bool,
    pub items: Vec<Item>,
}

impl BulkResponse {
    pub fn succeed_items(&self) -> impl Iterator<Item = &Item> {
        self.items.iter().filter(|x| x.success())
    }

    pub fn first_error(&self) -> Option<ItemError> {
        self.items
            .iter()
            .find(|x| !x.success())
            .and_then(|x| x.error())
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum Item {
    #[serde(rename = "create")]
    Create(ItemBody),
    #[serde(rename = "delete")]
    Delete(ItemBody),
    #[serde(rename = "index")]
    Index(ItemBody),
    #[serde(rename = "update")]
    Update(ItemBody),
}

impl Item {
    pub fn success(&self) -> bool {
        match *self {
            Item::Create(ref body)
            | Item::Delete(ref body)
            | Item::Index(ref body)
            | Item::Update(ref body) => body.status >= 200 && body.status < 400,
        }
    }

    pub fn error(&self) -> Option<ItemError> {
        match *self {
            Item::Create(ref body)
            | Item::Delete(ref body)
            | Item::Index(ref body)
            | Item::Update(ref body) => body.error.clone(),
        }
    }

    pub fn get_id(&self) -> String {
        match *self {
            Item::Create(ref body)
            | Item::Delete(ref body)
            | Item::Index(ref body)
            | Item::Update(ref body) => body.id.clone(),
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ItemBody {
    #[serde(rename = "_index")]
    pub index: String,
    #[serde(rename = "_type")]
    pub ty: String,
    #[serde(rename = "_id")]
    pub id: String,
    #[serde(rename = "_version")]
    pub version: Option<i32>,
    #[serde(rename = "_shards")]
    pub shards: Option<Shards>,
    #[serde(rename = "_seq_no")]
    pub seq_no: Option<i64>,
    #[serde(rename = "_primary_term")]
    pub primary_term: Option<i64>,
    pub result: Option<String>,
    pub status: i32,
    pub error: Option<ItemError>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ItemError {
    #[serde(rename = "type")]
    ty: String,
    reason: String,
    index_uuid: String,
    shard: Option<String>,
    index: String,
}

impl std::error::Error for ItemError {}

impl fmt::Display for ItemError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "reason: {}", self.reason)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_deserialize_bulk_response_all_success() {
        let resp_str = r#"{
   "took": 30,
   "errors": false,
   "items": [
      {
         "index": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 0,
            "_primary_term": 1
         }
      },
      {
         "delete": {
            "_index": "test",
            "_type": "_doc",
            "_id": "2",
            "_version": 1,
            "result": "not_found",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 404,
            "_seq_no" : 1,
            "_primary_term" : 2
         }
      },
      {
         "create": {
            "_index": "test",
            "_type": "_doc",
            "_id": "3",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 2,
            "_primary_term" : 3
         }
      },
      {
         "update": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 2,
            "result": "updated",
            "_shards": {
                "total": 2,
                "successful": 1,
                "failed": 0
            },
            "status": 200,
            "_seq_no" : 3,
            "_primary_term" : 4
         }
      }
   ]
}"#;

        assert!(serde_json::from_str::<BulkResponse>(resp_str).is_ok());
    }

    #[test]
    fn test_deserialize_bulk_response_with_error() {
        let resp_str_1 = r#"{
  "took": 486,
  "errors": true,
  "items": [
    {
      "update": {
        "_index": "index1",
        "_type" : "_doc",
        "_id": "5",
        "status": 404,
        "error": {
          "type": "document_missing_exception",
          "reason": "[_doc][5]: document missing",
          "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
          "shard": "0",
          "index": "index1"
        }
      }
    },
    {
      "update": {
        "_index": "index1",
        "_type" : "_doc",
        "_id": "6",
        "status": 404,
        "error": {
          "type": "document_missing_exception",
          "reason": "[_doc][6]: document missing",
          "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
          "shard": "0",
          "index": "index1"
        }
      }
    },
    {
      "create": {
        "_index": "index1",
        "_type" : "_doc",
        "_id": "7",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    }
  ]
}"#;
        assert!(serde_json::from_str::<BulkResponse>(resp_str_1).is_ok());

        let resp_str_2 = r#"{
  "took": 486,
  "errors": true,
  "items": [
    {
      "update": {
        "_index": "index1",
        "_type" : "_doc",
        "_id": "5",
        "status": 404,
        "error": {
          "type": "document_missing_exception",
          "reason": "[_doc][5]: document missing",
          "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
          "shard": "0",
          "index": "index1"
        }
      }
    },
    {
      "update": {
        "_index": "index1",
        "_type" : "_doc",
        "_id": "6",
        "status": 404,
        "error": {
          "type": "document_missing_exception",
          "reason": "[_doc][6]: document missing",
          "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
          "shard": "0",
          "index": "index1"
        }
      }
    },
    {
      "create": {
        "_index": "index1",
        "_type" : "_doc",
        "_id": "7",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    }
  ]
}"#;

        assert!(serde_json::from_str::<BulkResponse>(resp_str_2).is_ok());
    }
}