1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use uuid::Uuid;
use aorist_core::{PushshiftAPILocation, HiveLocation, RemoteStorage, DataSet, MinioLocation, HiveTableStorage, WebLocation, OnPremiseLocation, FasttextEmbeddingSchema};

define_constraint!(DownloadDataFromRemoteWebLocation, true, SatisfyDownloadDataFromRemoteWebLocation, WebLocation, Constraint, Some("Downloading data from remote web location".to_string()), Some("Data for this particular asset(s) is located somewhere on the web.
We need to download it to a local directory first, before we can
do anything with it.".to_string()), |_, _| true, |_, _| Vec::new());

define_constraint!(ORCTableSchemasCreated, true, SatisfyORCTableSchemasCreated, HiveTableStorage, Constraint, Some("Creating Table Schemas".to_string()), Some("We will be uploading tabular data into our warehouse. Before we upload
data files we need to create schemas for the tables which will refer
to these files.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry|
ancestry.replication_storage_setup(root.clone()).is_ok(), |_, _| Vec::new());

define_constraint!(HiveDirectoriesCreated, true, SatisfyHiveDirectoriesCreated, HiveLocation, Constraint, Some("Created hive directories.".to_string()), Some("We need to create directories or buckets (depending on file system / storage
solution) in which we will store our Hive data.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry|
    ancestry.hive_table_storage(root.clone()).is_ok() &&
    ancestry.replication_storage_setup(root.clone()).is_ok(), |_, _| Vec::new());

define_constraint!(DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON, true, SatisfyDownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON, PushshiftAPILocation, Constraint, Some("Downloading data from the Pushshift API".to_string()), Some("Data for this particular asset(s) is located in the Pushshift API.
We need to download it to a local directory first, before we can
do anything with it.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry|
match ancestry.replication_storage_setup(root.clone()) {
    Ok(x) => {
      debug!("Root has ReplicationStorageSetup.");
      match *x.0.read().unwrap().tmp_encoding.0.read().unwrap() {
          aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true,
          aorist_core::Encoding::CSVEncoding(_) => true,
          _ => false,
      }
    }
    _ => false,
}, |_, _| Vec::new(), HiveDirectoriesCreated);

define_constraint!(JSONTableSchemasCreated, true, SatisfyJSONTableSchemasCreated, HiveTableStorage, Constraint, Some("Create schemas for temporary JSON tables.".to_string()), Some("We will use Hive tables with external storage as a staging location for our
data. We need to create these schemas to be able to write data to them.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| {
   match &*ancestry.hive_table_storage(root.clone()).unwrap().0.read().unwrap().encoding.0.read().unwrap() {
      aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true,
      _ => false,
   }
}, |_, _| Vec::new(), HiveDirectoriesCreated);

define_constraint!(ReadyForUpload, false, SatisfyReadyForUpload, DataSet, Constraint, Some("Convert data".to_string()), None, |_, _| true, |_, _| Vec::new(), DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON);

define_constraint!(ConvertJSONToCSV, true, SatisfyConvertJSONToCSV, RemoteStorage, Constraint, Some("Convert JSON data to CSV".to_string()), Some("We need to convert the JSON data to CSV format to process it further.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| {
    match *ancestry.remote_storage(root.clone()).unwrap().0.read().unwrap().encoding.0.read().unwrap() {
        aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => {
            ancestry.replication_storage_setup(root.clone()).is_ok()
        },
        _ => false,
    }
}, |_, _| Vec::new(), DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON);

define_constraint!(UploadDataToMinio, true, SatisfyUploadDataToMinio, MinioLocation, Constraint, Some("Upload data to Min.IO".to_string()), Some("Now that data has been pre-processed we can upload it to the underlying
Min.IO storage.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| {
  ancestry.replication_storage_setup(root.clone()).is_ok()
}, |_, _| Vec::new(), ReadyForUpload, HiveDirectoriesCreated);

define_constraint!(UploadDataToLocal, false, SatisfyUploadDataToLocal, OnPremiseLocation, Constraint, None, None, |root: AoristRef<Concept>, ancestry: &ConceptAncestry|
ancestry.replication_storage_setup(root.clone()).is_ok(), |_, _| Vec::new(), UploadDataToMinio);

define_constraint!(ConvertJSONTableToORCTable, true, SatisfyConvertJSONTableToORCTable, HiveTableStorage, Constraint, Some("Convert JSON Table to ORC Table".to_string()), Some("Hive tables can be stored in external JSON format, but this is inefficient.
We can convert them to ORC (the native Hive format) to speed up access.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry|
match ancestry.static_data_table(root.clone()) {
    Ok(sdt) => match &*sdt.0.read().unwrap().setup.0.read().unwrap() {
        aorist_core::StorageSetup::ReplicationStorageSetup(_) => true,
        _ => false,
    },
    _ => false,
}, |_, _| Vec::new(), JSONTableSchemasCreated, UploadDataToLocal, ORCTableSchemasCreated);

define_constraint!(FasttextTrainingData, true, SatisfyFasttextTrainingData, FasttextEmbeddingSchema, Constraint, Some("Creating Fasttext Training Dataset".to_string()), Some("We download Fasttext data from a Hive table.".to_string()), |_, _| true, |_, _| Vec::new(), ConvertJSONTableToORCTable);

define_constraint!(TrainFasttextModel, true, SatisfyTrainFasttextModel, FasttextEmbeddingSchema, Constraint, Some("Training Fasttext Model".to_string()), Some("This operation trains the Fasttext model and saves
a dataset mapping words to their embeddings to a local
file.".to_string()), |_, _| true, |_, _| Vec::new(), FasttextTrainingData);

define_constraint!(UploadFasttextToMinio, true, SatisfyUploadFasttextToMinio, FasttextEmbeddingSchema, Constraint, Some("Upload data to Min.IO".to_string()), Some("Now that data has been pre-processed we can upload it to the underlying
Min.IO storage.
".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| {
    match ancestry.fasttext_embedding(root.clone()) {
        Ok(x) => match *x.0.read().unwrap().setup.0.read().unwrap() {
            aorist_core::StorageSetup::LocalStorageSetup(ref s) => match *s.0.read().unwrap().local.0.read().unwrap() {
                aorist_core::Storage::HiveTableStorage(ref h) => match *h.0.read().unwrap().location.0.read().unwrap() {
                    aorist_core::HiveLocation::MinioLocation(_) => true,
                    _ => false,
                },
                _ => false,
            },
            _ => false,
        }
        Err(_) => false,
    }
}
, |_, _| Vec::new(), TrainFasttextModel);

register_constraint_new!(AoristConstraint, 'a, DownloadDataFromRemoteWebLocation
,    ORCTableSchemasCreated
,    HiveDirectoriesCreated
,    DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON
,    JSONTableSchemasCreated
,    ReadyForUpload
,    ConvertJSONToCSV
,    UploadDataToMinio
,    UploadDataToLocal
,    ConvertJSONTableToORCTable
,    FasttextTrainingData
,    TrainFasttextModel
,    UploadFasttextToMinio);