import os
import requests
import redis
import unittest
from typing import Set
from cloudproof_findex import (
AuthorizationToken,
Findex,
IndexedValuesAndKeywords,
Keyword,
Location,
Key,
ProgressResults,
PythonCallbacks,
)
class TestStructures(unittest.TestCase):
def test_location(self) -> None:
input_string = 'test location'
loc_str = Location.from_string(input_string)
self.assertEqual(str(loc_str), input_string)
self.assertEqual(loc_str, input_string)
self.assertNotEqual(loc_str, 'wrong str')
self.assertEqual(hash(loc_str), hash(input_string))
input_int = 2**51 - 1
loi32 = Location.from_int(input_int)
self.assertEqual(int(loi32), input_int)
self.assertEqual(loi32, input_int)
self.assertNotEqual(loi32, 2**51 - 2)
input_bytes = b'test location'
loc_bytes = Location.from_bytes(input_bytes)
self.assertEqual(bytes(loc_bytes), input_bytes)
self.assertEqual(loc_bytes, input_bytes)
self.assertNotEqual(loc_bytes, b'wrong bytes')
self.assertEqual(hash(loc_bytes), hash(input_bytes))
self.assertEqual(loc_str, loc_bytes)
self.assertNotEqual(loc_str, loi32)
def test_keyword(self) -> None:
input_string = 'test keyword'
kw_str = Keyword.from_string(input_string)
self.assertEqual(str(kw_str), input_string)
self.assertEqual(kw_str, input_string)
self.assertNotEqual(kw_str, 'wrong str')
self.assertEqual(hash(kw_str), hash(input_string))
input_int = 2**51 - 1
kw_int = Keyword.from_int(input_int)
self.assertEqual(int(kw_int), input_int)
self.assertEqual(kw_int, input_int)
self.assertNotEqual(kw_int, 2**51 - 2)
input_bytes = b'test keyword'
kw_bytes = Keyword.from_bytes(input_bytes)
self.assertEqual(bytes(kw_bytes), input_bytes)
self.assertEqual(kw_bytes, input_bytes)
self.assertNotEqual(kw_bytes, b'wrong bytes')
self.assertEqual(hash(kw_bytes), hash(input_bytes))
self.assertEqual(kw_str, kw_bytes)
self.assertNotEqual(kw_str, kw_int)
def test_keys(self) -> None:
msk = Key.random()
self.assertIsInstance(msk, Key)
saved_bytes = msk.to_bytes()
reloaded_msk = Key.from_bytes(saved_bytes)
self.assertEqual(saved_bytes, reloaded_msk.to_bytes())
with self.assertRaises(ValueError):
Key.from_bytes(b'wrong size')
def define_custom_db_interface(is_with_test: bool = False):
table: dict = {}
def fetch(uids):
res = {}
for uid in uids:
if uid in table:
res[uid] = table[uid]
return res
def upsert(old_values: dict, new_values: dict):
res = {}
for uid, new_value in new_values.items():
current_value = table.get(uid)
old_value = old_values.get(uid)
if old_value == current_value:
table[uid] = new_value
elif not current_value:
raise ValueError('The current value needs to be defined as long as the old value is defined ')
else:
res[uid] = current_value
return res
def insert(items):
for uid, value in items.items():
if uid in table:
raise ValueError('collision in insert operation on UID: ' + uid)
table[uid] = value
def delete(uids):
for uid in uids:
table.pop(uid)
def dump_tokens():
return table.keys()
if is_with_test:
k1 = 'my first key'
k2 = 'my second key'
k3 = 'my third key'
v1 = [1, 2, 3]
v2 = [4, 5, 6]
v3 = [7, 8, 9]
res = upsert({}, {k1: v1})
assert not res
assert v1 == fetch([k1])[k1]
res = upsert({k1: v1}, {k1: v2})
assert not res
assert v2 == fetch([k1])[k1]
res = upsert({k1: v1}, {k1: v3})
assert res == {k1: v2}
assert v2 == fetch([k1])[k1]
assert {k1} == dump_tokens()
insert({k3: v1})
assert v1 == fetch([k3])[k3]
assert not fetch([k2])
try:
insert({k1: v2})
raise ValueError('collision on key: ' + k1)
except:
pass
table = {}
in_memory_db_interface = PythonCallbacks.new()
in_memory_db_interface.set_fetch(fetch)
in_memory_db_interface.set_upsert(upsert)
in_memory_db_interface.set_insert(insert)
in_memory_db_interface.set_delete(delete)
in_memory_db_interface.set_dump_tokens(dump_tokens)
return in_memory_db_interface
class TestFindex(unittest.TestCase):
def setUp(self) -> None:
self.findex_key = Key.random()
self.label = "My label."
self.db = {
1: ['Martin', 'Sheperd'],
2: ['Martial', 'Wilkins'],
3: ['John', 'Sheperd'],
}
rest_server_url = 'http://localhost:8080'
res = requests.post(
rest_server_url + '/indexes',
headers={'Content-Type': 'application/json'},
json={'name': 'Test'},
timeout=5,
)
response = res.json()
token = AuthorizationToken.new(
index_id=response['public_id'],
findex_key=Key.random(),
fetch_entries_key=Key.from_bytes(response['fetch_entries_key']),
fetch_chains_key=Key.from_bytes(response['fetch_chains_key']),
upsert_entries_key=Key.from_bytes(response['upsert_entries_key']),
insert_chains_key=Key.from_bytes(response['insert_chains_key']),
)
in_memory_db_interface = define_custom_db_interface()
sqlite_path = '/tmp/cloudproof_findex.sqlite'
redis_host = 'localhost'
redis_port = 6379
redis_url = f'redis://{redis_host}:{redis_port}'
if os.path.exists(sqlite_path):
os.remove(sqlite_path)
r = redis.Redis(host=redis_host, port=redis_port, db=0)
print(redis_url)
r.flushdb()
self.findex_interfaces = {
'sqlite': Findex.new_with_sqlite_interface(
self.findex_key,
self.label,
sqlite_path,
),
'redis': Findex.new_with_redis_interface(
self.findex_key,
self.label,
redis_url,
),
'rest': Findex.new_with_rest_interface(self.label,
str(token),
rest_server_url),
'custom': Findex.new_with_custom_interface(
self.findex_key, self.label, in_memory_db_interface, in_memory_db_interface
),
}
def test_upsert(self) -> None:
indexed_values_and_keywords: IndexedValuesAndKeywords = {
Location.from_int(k): v for k, v in self.db.items()
}
for interface, instance in self.findex_interfaces.items():
print('Test upserting on {} interface.', interface)
res = instance.add(indexed_values_and_keywords)
self.assertEqual(len(res), 5)
res = instance.add({Location.from_int(4): ['John', 'Snow']})
self.assertEqual(res, set(['Snow']))
def test_upsert_search(self) -> None:
indexed_values_and_keywords: IndexedValuesAndKeywords = {
Location.from_int(k): v for k, v in self.db.items()
}
for interface, instance in self.findex_interfaces.items():
print('Test upserting and search on {} interface.', interface)
instance.add(indexed_values_and_keywords)
res = instance.search([Keyword.from_bytes(b'Martial')])
self.assertEqual(len(res), 1)
self.assertEqual(len(res[Keyword.from_string('Martial')]), 1)
self.assertEqual(int(res['Martial'][0]), 2)
res = instance.search(['Sheperd', 'Wilkins'])
self.assertEqual(len(res['Sheperd']), 2)
self.assertEqual(len(res['Wilkins']), 1)
def test_graph_upsert_search(self) -> None:
indexed_values_and_keywords: IndexedValuesAndKeywords = {
Location.from_int(k): v for k, v in self.db.items()
}
for interface, instance in self.findex_interfaces.items():
print(f'Test graph upserting and search on {interface} interface.')
instance.add(indexed_values_and_keywords)
graph: IndexedValuesAndKeywords = {
Keyword.from_string('Mart'): ['Mar'],
Keyword.from_string('Marti'): ['Mart'],
Keyword.from_string('Martin'): ['Marti'],
Keyword.from_string('Martia'): ['Marti'],
Keyword.from_string('Martial'): ['Martia'],
}
instance.add(graph)
res = instance.search(['Mar'])
self.assertEqual(len(res['Mar']), 2)
def false_progress_callback(res: ProgressResults) -> bool:
self.assertEqual(len(res['Mar']), 1)
return True
res = instance.search(
['Mar'],
interrupt=false_progress_callback,
)
self.assertEqual(len(res['Mar']), 0)
def early_stop_progress_callback(res: ProgressResults) -> bool:
if 'Martin' in res:
return True
return False
res = instance.search(
['Mar'],
interrupt=early_stop_progress_callback,
)
self.assertEqual(len(res['Mar']), 1)
def test_compact(self) -> None:
indexed_values_and_keywords: IndexedValuesAndKeywords = {
Location.from_int(k): v for k, v in self.db.items()
}
interfaces = [
(interface, instance)
for interface, instance in self.findex_interfaces.items()
if interface == 'sqlite'
]
for interface, instance in interfaces:
print(f'Test compacting and search on {interface} interface.')
instance.add(indexed_values_and_keywords)
new_label = "My renewed label"
filtered_locations = { Location.from_int(2) }
def filter_obsolete_data(dataset: Set[Location]):
res = set()
for data in dataset:
if data not in filtered_locations:
res.add(data)
return res
instance.compact(self.findex_key, new_label, 1, filter_obsolete_data)
res = instance.search(['Sheperd'])
self.assertEqual(len(res['Sheperd']), 2)
res = instance.search(['Martial', 'Wilkins'])
self.assertEqual(len(res['Martial']), 0)
self.assertEqual(len(res['Wilkins']), 0)
if __name__ == '__main__':
define_custom_db_interface(True)
unittest.main()