1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
use uuid::Uuid; use aorist_core::{PushshiftAPILocation, HiveLocation, RemoteStorage, DataSet, MinioLocation, HiveTableStorage, WebLocation, OnPremiseLocation, FasttextEmbeddingSchema}; define_constraint!(DownloadDataFromRemoteWebLocation, true, SatisfyDownloadDataFromRemoteWebLocation, WebLocation, Constraint, Some("Downloading data from remote web location".to_string()), Some("Data for this particular asset(s) is located somewhere on the web. We need to download it to a local directory first, before we can do anything with it.".to_string()), |_, _| true, |_, _| Vec::new()); define_constraint!(ORCTableSchemasCreated, true, SatisfyORCTableSchemasCreated, HiveTableStorage, Constraint, Some("Creating Table Schemas".to_string()), Some("We will be uploading tabular data into our warehouse. Before we upload data files we need to create schemas for the tables which will refer to these files. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok(), |_, _| Vec::new()); define_constraint!(HiveDirectoriesCreated, true, SatisfyHiveDirectoriesCreated, HiveLocation, Constraint, Some("Created hive directories.".to_string()), Some("We need to create directories or buckets (depending on file system / storage solution) in which we will store our Hive data. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| ancestry.hive_table_storage(root.clone()).is_ok() && ancestry.replication_storage_setup(root.clone()).is_ok(), |_, _| Vec::new()); define_constraint!(DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON, true, SatisfyDownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON, PushshiftAPILocation, Constraint, Some("Downloading data from the Pushshift API".to_string()), Some("Data for this particular asset(s) is located in the Pushshift API. We need to download it to a local directory first, before we can do anything with it. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => { debug!("Root has ReplicationStorageSetup."); match *x.0.read().unwrap().tmp_encoding.0.read().unwrap() { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, aorist_core::Encoding::CSVEncoding(_) => true, _ => false, } } _ => false, }, |_, _| Vec::new(), HiveDirectoriesCreated); define_constraint!(JSONTableSchemasCreated, true, SatisfyJSONTableSchemasCreated, HiveTableStorage, Constraint, Some("Create schemas for temporary JSON tables.".to_string()), Some("We will use Hive tables with external storage as a staging location for our data. We need to create these schemas to be able to write data to them. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| { match &*ancestry.hive_table_storage(root.clone()).unwrap().0.read().unwrap().encoding.0.read().unwrap() { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, _ => false, } }, |_, _| Vec::new(), HiveDirectoriesCreated); define_constraint!(ReadyForUpload, false, SatisfyReadyForUpload, DataSet, Constraint, Some("Convert data".to_string()), None, |_, _| true, |_, _| Vec::new(), DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON); define_constraint!(ConvertJSONToCSV, true, SatisfyConvertJSONToCSV, RemoteStorage, Constraint, Some("Convert JSON data to CSV".to_string()), Some("We need to convert the JSON data to CSV format to process it further. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| { match *ancestry.remote_storage(root.clone()).unwrap().0.read().unwrap().encoding.0.read().unwrap() { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => { ancestry.replication_storage_setup(root.clone()).is_ok() }, _ => false, } }, |_, _| Vec::new(), DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON); define_constraint!(UploadDataToMinio, true, SatisfyUploadDataToMinio, MinioLocation, Constraint, Some("Upload data to Min.IO".to_string()), Some("Now that data has been pre-processed we can upload it to the underlying Min.IO storage. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| { ancestry.replication_storage_setup(root.clone()).is_ok() }, |_, _| Vec::new(), ReadyForUpload, HiveDirectoriesCreated); define_constraint!(UploadDataToLocal, false, SatisfyUploadDataToLocal, OnPremiseLocation, Constraint, None, None, |root: AoristRef<Concept>, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok(), |_, _| Vec::new(), UploadDataToMinio); define_constraint!(ConvertJSONTableToORCTable, true, SatisfyConvertJSONTableToORCTable, HiveTableStorage, Constraint, Some("Convert JSON Table to ORC Table".to_string()), Some("Hive tables can be stored in external JSON format, but this is inefficient. We can convert them to ORC (the native Hive format) to speed up access. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| match ancestry.static_data_table(root.clone()) { Ok(sdt) => match &*sdt.0.read().unwrap().setup.0.read().unwrap() { aorist_core::StorageSetup::ReplicationStorageSetup(_) => true, _ => false, }, _ => false, }, |_, _| Vec::new(), JSONTableSchemasCreated, UploadDataToLocal, ORCTableSchemasCreated); define_constraint!(FasttextTrainingData, true, SatisfyFasttextTrainingData, FasttextEmbeddingSchema, Constraint, Some("Creating Fasttext Training Dataset".to_string()), Some("We download Fasttext data from a Hive table.".to_string()), |_, _| true, |_, _| Vec::new(), ConvertJSONTableToORCTable); define_constraint!(TrainFasttextModel, true, SatisfyTrainFasttextModel, FasttextEmbeddingSchema, Constraint, Some("Training Fasttext Model".to_string()), Some("This operation trains the Fasttext model and saves a dataset mapping words to their embeddings to a local file.".to_string()), |_, _| true, |_, _| Vec::new(), FasttextTrainingData); define_constraint!(UploadFasttextToMinio, true, SatisfyUploadFasttextToMinio, FasttextEmbeddingSchema, Constraint, Some("Upload data to Min.IO".to_string()), Some("Now that data has been pre-processed we can upload it to the underlying Min.IO storage. ".to_string()), |root: AoristRef<Concept>, ancestry: &ConceptAncestry| { match ancestry.fasttext_embedding(root.clone()) { Ok(x) => match *x.0.read().unwrap().setup.0.read().unwrap() { aorist_core::StorageSetup::LocalStorageSetup(ref s) => match *s.0.read().unwrap().local.0.read().unwrap() { aorist_core::Storage::HiveTableStorage(ref h) => match *h.0.read().unwrap().location.0.read().unwrap() { aorist_core::HiveLocation::MinioLocation(_) => true, _ => false, }, _ => false, }, _ => false, } Err(_) => false, } } , |_, _| Vec::new(), TrainFasttextModel); register_constraint_new!(AoristConstraint, 'a, DownloadDataFromRemoteWebLocation , ORCTableSchemasCreated , HiveDirectoriesCreated , DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON , JSONTableSchemasCreated , ReadyForUpload , ConvertJSONToCSV , UploadDataToMinio , UploadDataToLocal , ConvertJSONTableToORCTable , FasttextTrainingData , TrainFasttextModel , UploadFasttextToMinio);